start all of the sends in parallel (from the same buffer) - wait for
all to complete This commit was SVN r7935.
Этот коммит содержится в:
родитель
a891db81e9
Коммит
cf5c27c1e3
@ -18,6 +18,7 @@
|
|||||||
#include <string.h>
|
#include <string.h>
|
||||||
|
|
||||||
#include "include/constants.h"
|
#include "include/constants.h"
|
||||||
|
#include "opal/threads/condition.h"
|
||||||
#include "opal/util/output.h"
|
#include "opal/util/output.h"
|
||||||
#include "util/proc_info.h"
|
#include "util/proc_info.h"
|
||||||
#include "orte/dps/dps.h"
|
#include "orte/dps/dps.h"
|
||||||
@ -42,6 +43,44 @@
|
|||||||
* continuing to forward data along the distribution tree.
|
* continuing to forward data along the distribution tree.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
|
|
||||||
|
struct mca_oob_xcast_t {
|
||||||
|
opal_object_t super;
|
||||||
|
opal_mutex_t mutex;
|
||||||
|
opal_condition_t cond;
|
||||||
|
size_t counter;
|
||||||
|
};
|
||||||
|
typedef struct mca_oob_xcast_t mca_oob_xcast_t;
|
||||||
|
|
||||||
|
static void mca_oob_xcast_construct(mca_oob_xcast_t* xcast)
|
||||||
|
{
|
||||||
|
OBJ_CONSTRUCT(&xcast->mutex, opal_mutex_t);
|
||||||
|
OBJ_CONSTRUCT(&xcast->cond, opal_condition_t);
|
||||||
|
}
|
||||||
|
|
||||||
|
static void mca_oob_xcast_destruct(mca_oob_xcast_t* xcast)
|
||||||
|
{
|
||||||
|
OBJ_DESTRUCT(&xcast->mutex);
|
||||||
|
OBJ_DESTRUCT(&xcast->cond);
|
||||||
|
}
|
||||||
|
|
||||||
|
static OBJ_CLASS_INSTANCE(
|
||||||
|
mca_oob_xcast_t,
|
||||||
|
opal_object_t,
|
||||||
|
mca_oob_xcast_construct,
|
||||||
|
mca_oob_xcast_destruct);
|
||||||
|
|
||||||
|
static void mca_oob_xcast_cb(int status, orte_process_name_t* peer, orte_buffer_t* buffer, int tag, void* cbdata)
|
||||||
|
{
|
||||||
|
mca_oob_xcast_t* xcast = (mca_oob_xcast_t*)cbdata;
|
||||||
|
OPAL_THREAD_LOCK(&xcast->mutex);
|
||||||
|
if(--xcast->counter == 0) {
|
||||||
|
opal_condition_signal(&xcast->cond);
|
||||||
|
}
|
||||||
|
OPAL_THREAD_UNLOCK(&xcast->mutex);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
int mca_oob_xcast(
|
int mca_oob_xcast(
|
||||||
orte_process_name_t* root,
|
orte_process_name_t* root,
|
||||||
orte_process_name_t* peers,
|
orte_process_name_t* peers,
|
||||||
@ -59,6 +98,8 @@ int mca_oob_xcast(
|
|||||||
/* check to see if I am the root process name */
|
/* check to see if I am the root process name */
|
||||||
cmpval = orte_ns.compare(ORTE_NS_CMP_ALL, root, orte_process_info.my_name);
|
cmpval = orte_ns.compare(ORTE_NS_CMP_ALL, root, orte_process_info.my_name);
|
||||||
if(NULL != root && 0 == cmpval) {
|
if(NULL != root && 0 == cmpval) {
|
||||||
|
mca_oob_xcast_t *xcast = OBJ_NEW(mca_oob_xcast_t);
|
||||||
|
xcast->counter = num_peers;
|
||||||
for(i=0; i<num_peers; i++) {
|
for(i=0; i<num_peers; i++) {
|
||||||
/* check status of peer to ensure they are alive */
|
/* check status of peer to ensure they are alive */
|
||||||
if (ORTE_SUCCESS != (rc = orte_soh.get_proc_soh(&state, &status, peers+i))) {
|
if (ORTE_SUCCESS != (rc = orte_soh.get_proc_soh(&state, &status, peers+i))) {
|
||||||
@ -66,13 +107,22 @@ int mca_oob_xcast(
|
|||||||
return rc;
|
return rc;
|
||||||
}
|
}
|
||||||
if (state != ORTE_PROC_STATE_TERMINATED) {
|
if (state != ORTE_PROC_STATE_TERMINATED) {
|
||||||
rc = mca_oob_send_packed(peers+i, buffer, tag, 0);
|
rc = mca_oob_send_packed_nb(peers+i, buffer, tag, 0, mca_oob_xcast_cb, xcast);
|
||||||
if (rc < 0) {
|
if (rc < 0) {
|
||||||
ORTE_ERROR_LOG(rc);
|
ORTE_ERROR_LOG(rc);
|
||||||
return rc;
|
return rc;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/* wait for all non-blocking operations to complete */
|
||||||
|
OPAL_THREAD_LOCK(&xcast->mutex);
|
||||||
|
while(xcast->counter > 0) {
|
||||||
|
opal_condition_wait(&xcast->cond, &xcast->mutex);
|
||||||
|
}
|
||||||
|
OPAL_THREAD_UNLOCK(&xcast->mutex);
|
||||||
|
OBJ_RELEASE(xcast);
|
||||||
|
|
||||||
} else {
|
} else {
|
||||||
orte_buffer_t rbuf;
|
orte_buffer_t rbuf;
|
||||||
orte_gpr_notify_message_t *msg;
|
orte_gpr_notify_message_t *msg;
|
||||||
|
Загрузка…
x
Ссылка в новой задаче
Block a user