1
1

Cleanup some errors in pubsub - must set the active flag before posting the recv in case the message has already arrived

Refs trac:3696

This commit was SVN r29167.

The following Trac tickets were found above:
  Ticket 3696 --> https://svn.open-mpi.org/trac/ompi/ticket/3696
Этот коммит содержится в:
Ralph Castain 2013-09-15 15:26:32 +00:00
родитель 497c7e6abb
Коммит b64c8dafd8
2 изменённых файлов: 23 добавлений и 8 удалений

Просмотреть файл

@ -188,24 +188,28 @@ static int publish ( char *service_name, ompi_info_t *info, char *port_name )
/* pack the publish command */
if (OPAL_SUCCESS != (rc = opal_dss.pack(buf, &cmd, 1, ORTE_DATA_SERVER_CMD))) {
ORTE_ERROR_LOG(rc);
OBJ_RELEASE(buf);
goto CLEANUP;
}
/* pack the service name */
if (OPAL_SUCCESS != (rc = opal_dss.pack(buf, &service_name, 1, OPAL_STRING))) {
ORTE_ERROR_LOG(rc);
OBJ_RELEASE(buf);
goto CLEANUP;
}
/* pack the port name */
if (OPAL_SUCCESS != (rc = opal_dss.pack(buf, &port_name, 1, OPAL_STRING))) {
ORTE_ERROR_LOG(rc);
OBJ_RELEASE(buf);
goto CLEANUP;
}
/* pack the uniqueness flag */
if (OPAL_SUCCESS != (rc = opal_dss.pack(buf, &unique, 1, OPAL_BOOL))) {
ORTE_ERROR_LOG(rc);
OBJ_RELEASE(buf);
goto CLEANUP;
}
@ -214,16 +218,17 @@ static int publish ( char *service_name, ompi_info_t *info, char *port_name )
ORTE_RML_TAG_DATA_SERVER,
orte_rml_send_callback, NULL))) {
ORTE_ERROR_LOG(rc);
OBJ_RELEASE(buf);
goto CLEANUP;
}
/* get the answer */
OBJ_CONSTRUCT(&xfer, orte_rml_recv_cb_t);
xfer.active = true;
orte_rml.recv_buffer_nb(ORTE_NAME_WILDCARD,
ORTE_RML_TAG_DATA_CLIENT,
ORTE_RML_NON_PERSISTENT,
orte_rml_recv_callback, &xfer);
xfer.active = true;
OMPI_WAIT_FOR_COMPLETION(xfer.active);
/* unpack the result */
@ -234,9 +239,7 @@ static int publish ( char *service_name, ompi_info_t *info, char *port_name )
rc = ret;
OBJ_DESTRUCT(&xfer);
CLEANUP:
OBJ_DESTRUCT(&buf);
CLEANUP:
return rc;
}
@ -398,11 +401,11 @@ static char* lookup ( char *service_name, ompi_info_t *info )
/* get the answer */
OBJ_CONSTRUCT(&xfer, orte_rml_recv_cb_t);
xfer.active = true;
orte_rml.recv_buffer_nb(ORTE_NAME_WILDCARD,
ORTE_RML_TAG_DATA_CLIENT,
ORTE_RML_NON_PERSISTENT,
orte_rml_recv_callback, &xfer);
xfer.active = true;
OMPI_WAIT_FOR_COMPLETION(xfer.active);
/* unpack the return code */
@ -529,10 +532,10 @@ static int unpublish ( char *service_name, ompi_info_t *info )
/* get the answer */
OBJ_CONSTRUCT(&xfer, orte_rml_recv_cb_t);
xfer.active = true;
orte_rml.recv_buffer_nb(ORTE_NAME_WILDCARD, ORTE_RML_TAG_DATA_CLIENT,
ORTE_RML_NON_PERSISTENT,
orte_rml_recv_callback, &xfer);
xfer.active = true;
OMPI_WAIT_FOR_COMPLETION(xfer.active);
/* unpack the result */

Просмотреть файл

@ -6,6 +6,7 @@
*/
#include <stdio.h>
#include <stdbool.h>
#include "mpi.h"
int main(int argc, char* argv[])
@ -13,6 +14,13 @@ int main(int argc, char* argv[])
int rank, size;
MPI_Info info, srch;
char port[MPI_MAX_PORT_NAME];
bool local=false;
if (1 < argc) {
if (0 == strcmp("local", argv[1])) {
local = true;
}
}
MPI_Init(&argc, &argv);
@ -22,8 +30,12 @@ int main(int argc, char* argv[])
printf("Hello, World, I am %d of %d\n", rank, size);
MPI_Info_create(&info);
MPI_Info_set(info, "ompi_global_scope", "true");
if (local) {
MPI_Info_set(info, "ompi_global_scope", "false");
} else {
MPI_Info_set(info, "ompi_global_scope", "true");
}
if (0 == rank) {
MPI_Open_port(MPI_INFO_NULL, port);
MPI_Publish_name("pubsub-test", info, port);