From bebe8520572d9eb771b7f686022281c40709062a Mon Sep 17 00:00:00 2001 From: Ralph Castain Date: Wed, 14 Aug 2013 04:21:17 +0000 Subject: [PATCH] Add new info key for publish that allows user to designate that the port is to be unique - i.e., to return an error if that service has already been published. Default is to overwrite This commit was SVN r29028. --- ompi/mca/pubsub/orte/pubsub_orte.c | 13 + orte/runtime/orte_data_server.c | 375 +++++++++++++++-------------- 2 files changed, 213 insertions(+), 175 deletions(-) diff --git a/ompi/mca/pubsub/orte/pubsub_orte.c b/ompi/mca/pubsub/orte/pubsub_orte.c index 9dc611c122..72f2e5bec3 100644 --- a/ompi/mca/pubsub/orte/pubsub_orte.c +++ b/ompi/mca/pubsub/orte/pubsub_orte.c @@ -132,6 +132,7 @@ static int publish ( char *service_name, ompi_info_t *info, char *port_name ) opal_buffer_t buf; orte_data_server_cmd_t cmd=ORTE_DATA_SERVER_PUBLISH; orte_std_cntr_t cnt; + bool unique=false; ompi_info_get_bool(info, "ompi_global_scope", &global_scope, &flag); @@ -172,6 +173,12 @@ static int publish ( char *service_name, ompi_info_t *info, char *port_name ) ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), service_name, global_scope ? "Global" : "Local")); + ompi_info_get_bool(info, "ompi_unique", &unique, &flag); + if (0 == flag) { + /* uniqueness not specified - overwrite by default */ + unique = false; + } + /* construct the buffer */ OBJ_CONSTRUCT(&buf, opal_buffer_t); @@ -192,6 +199,12 @@ static int publish ( char *service_name, ompi_info_t *info, char *port_name ) ORTE_ERROR_LOG(rc); goto CLEANUP; } + + /* pack the uniqueness flag */ + if (OPAL_SUCCESS != (rc = opal_dss.pack(&buf, &unique, 1, OPAL_BOOL))) { + ORTE_ERROR_LOG(rc); + goto CLEANUP; + } /* send the data */ if (0 > (rc = orte_rml.send_buffer(info_host, &buf, ORTE_RML_TAG_DATA_SERVER, 0))) { diff --git a/orte/runtime/orte_data_server.c b/orte/runtime/orte_data_server.c index c6fea30ff1..2d64082705 100644 --- a/orte/runtime/orte_data_server.c +++ b/orte/runtime/orte_data_server.c @@ -163,13 +163,14 @@ void orte_data_server(int status, orte_process_name_t* sender, orte_data_object_t *data; opal_buffer_t *answer; int rc, ret; - count = 1; + bool unique; OPAL_OUTPUT_VERBOSE((1, orte_debug_output, "%s data server got message from %s", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), ORTE_NAME_PRINT(sender))); + count = 1; if (ORTE_SUCCESS != (rc = opal_dss.unpack(buffer, &command, &count, ORTE_DATA_SERVER_CMD))) { ORTE_ERROR_LOG(rc); return; @@ -178,31 +179,41 @@ void orte_data_server(int status, orte_process_name_t* sender, answer = OBJ_NEW(opal_buffer_t); switch(command) { - case ORTE_DATA_SERVER_PUBLISH: - /* unpack the service name */ - count = 1; - if (ORTE_SUCCESS != (rc = opal_dss.unpack(buffer, &service_name, &count, OPAL_STRING))) { - ORTE_ERROR_LOG(rc); - goto SEND_ERROR; - } + case ORTE_DATA_SERVER_PUBLISH: + /* unpack the service name */ + count = 1; + if (ORTE_SUCCESS != (rc = opal_dss.unpack(buffer, &service_name, &count, OPAL_STRING))) { + ORTE_ERROR_LOG(rc); + goto SEND_ERROR; + } - /* unpack the port name */ - count = 1; - if (ORTE_SUCCESS != (rc = opal_dss.unpack(buffer, &port_name, &count, OPAL_STRING))) { - ORTE_ERROR_LOG(rc); - goto SEND_ERROR; - } + /* unpack the port name */ + count = 1; + if (ORTE_SUCCESS != (rc = opal_dss.unpack(buffer, &port_name, &count, OPAL_STRING))) { + ORTE_ERROR_LOG(rc); + goto SEND_ERROR; + } - OPAL_OUTPUT_VERBOSE((1, orte_debug_output, - "%s data server: publishing service %s port %s", - ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), - service_name, port_name)); + /* unpack uniqueness flag */ + count = 1; + if (ORTE_SUCCESS != (rc = opal_dss.unpack(buffer, &unique, &count, OPAL_BOOL))) { + ORTE_ERROR_LOG(rc); + goto SEND_ERROR; + } + + OPAL_OUTPUT_VERBOSE((1, orte_debug_output, + "%s data server: publishing service %s port %s %s", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), + service_name, port_name, + unique ? "UNIQUE" : "OVERWRITE")); - /* check the current data store to see if this service name has already - * been published - */ - if (NULL != lookup(service_name)) { - /* already exists - return ORTE_EXISTS error code */ + /* check the current data store to see if this service name has already + * been published + */ + if (NULL != (data = lookup(service_name))) { + /* already exists - see if overwrite allowed */ + if (unique) { + /* return ORTE_EXISTS error code */ OPAL_OUTPUT_VERBOSE((1, orte_debug_output, "%s data server: publishing service %s port %s already exists", @@ -210,167 +221,82 @@ void orte_data_server(int status, orte_process_name_t* sender, service_name, port_name)); ret = ORTE_EXISTS; - if (ORTE_SUCCESS != (rc = opal_dss.pack(answer, &ret, 1, OPAL_INT))) { - ORTE_ERROR_LOG(rc); - /* if we can't pack it, we probably can't pack the - * rc value either, so just send whatever is there - */ - } - goto SEND_ANSWER; - } - - /* create a new data object */ - data = OBJ_NEW(orte_data_object_t); - - /* pass over the data values - these were malloc'd when unpacked, - * so we don't need to strdup them here - */ - data->service = service_name; - data->port = port_name; - data->owner.jobid = sender->jobid; - data->owner.vpid = sender->vpid; - - /* store the data */ - data->index = opal_pointer_array_add(orte_data_server_store, data); - - OPAL_OUTPUT_VERBOSE((1, orte_debug_output, - "%s data server: successfully published service %s port %s", - ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), - service_name, port_name)); - - /* tell the user it was wonderful... */ - ret = ORTE_SUCCESS; - if (ORTE_SUCCESS != (rc = opal_dss.pack(answer, &ret, 1, OPAL_INT))) { - ORTE_ERROR_LOG(rc); - /* if we can't pack it, we probably can't pack the - * rc value either, so just send whatever is there - */ - } - goto SEND_ANSWER; - break; - - case ORTE_DATA_SERVER_LOOKUP: - /* unpack the service name */ - count = 1; - if (ORTE_SUCCESS != (rc = opal_dss.unpack(buffer, &service_name, &count, OPAL_STRING))) { - ORTE_ERROR_LOG(rc); - goto SEND_ERROR; - } - - OPAL_OUTPUT_VERBOSE((1, orte_debug_output, - "%s data server: lookup on service %s", - ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), - service_name)); - - /* locate this record in the data store */ - if (NULL == (data = lookup(service_name))) { - + } else { OPAL_OUTPUT_VERBOSE((1, orte_debug_output, - "%s data server: service %s not found", + "%s data server: overwriting service %s with port %s", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), - service_name)); - - /* return ORTE_ERR_NOT_FOUND error code */ - ret = ORTE_ERR_NOT_FOUND; - if (ORTE_SUCCESS != (rc = opal_dss.pack(answer, &ret, 1, OPAL_INT))) { - ORTE_ERROR_LOG(rc); - /* if we can't pack it, we probably can't pack the - * rc value either, so just send whatever is there - */ + service_name, port_name)); + if (NULL != data->port) { + free(data->port); } - goto SEND_ANSWER; + data->port = port_name; + data->owner.jobid = sender->jobid; + data->owner.vpid = sender->vpid; + ret = ORTE_SUCCESS; } - - OPAL_OUTPUT_VERBOSE((1, orte_debug_output, - "%s data server: successful lookup on service %s port %s", - ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), - service_name, data->port)); - - /* pack success so the unpack on the other end can - * always unpack an int first - */ - ret = ORTE_SUCCESS; if (ORTE_SUCCESS != (rc = opal_dss.pack(answer, &ret, 1, OPAL_INT))) { ORTE_ERROR_LOG(rc); /* if we can't pack it, we probably can't pack the * rc value either, so just send whatever is there */ - goto SEND_ANSWER; - } - - /* pack the returned port */ - if (ORTE_SUCCESS != (rc = opal_dss.pack(answer, &data->port, 1, OPAL_STRING))) { - ORTE_ERROR_LOG(rc); - /* if we can't pack it, we probably can't pack the - * rc value either, so just send whatever is there - */ } goto SEND_ANSWER; - break; - - case ORTE_DATA_SERVER_UNPUBLISH: - /* unpack the service name */ - count = 1; - if (ORTE_SUCCESS != (rc = opal_dss.unpack(buffer, &service_name, &count, OPAL_STRING))) { - ORTE_ERROR_LOG(rc); - goto SEND_ERROR; - } + } + + /* create a new data object */ + data = OBJ_NEW(orte_data_object_t); + + /* pass over the data values - these were malloc'd when unpacked, + * so we don't need to strdup them here + */ + data->service = service_name; + data->port = port_name; + data->owner.jobid = sender->jobid; + data->owner.vpid = sender->vpid; + + /* store the data */ + data->index = opal_pointer_array_add(orte_data_server_store, data); + + OPAL_OUTPUT_VERBOSE((1, orte_debug_output, + "%s data server: successfully published service %s port %s", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), + service_name, port_name)); + + /* tell the user it was wonderful... */ + ret = ORTE_SUCCESS; + if (ORTE_SUCCESS != (rc = opal_dss.pack(answer, &ret, 1, OPAL_INT))) { + ORTE_ERROR_LOG(rc); + /* if we can't pack it, we probably can't pack the + * rc value either, so just send whatever is there + */ + } + goto SEND_ANSWER; + break; + + case ORTE_DATA_SERVER_LOOKUP: + /* unpack the service name */ + count = 1; + if (ORTE_SUCCESS != (rc = opal_dss.unpack(buffer, &service_name, &count, OPAL_STRING))) { + ORTE_ERROR_LOG(rc); + goto SEND_ERROR; + } + + OPAL_OUTPUT_VERBOSE((1, orte_debug_output, + "%s data server: lookup on service %s", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), + service_name)); + + /* locate this record in the data store */ + if (NULL == (data = lookup(service_name))) { + OPAL_OUTPUT_VERBOSE((1, orte_debug_output, - "%s data server: unpublish on service %s", + "%s data server: service %s not found", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), service_name)); - - /* locate this record in the data store */ - if (NULL == (data = lookup(service_name))) { - OPAL_OUTPUT_VERBOSE((1, orte_debug_output, - "%s data server: service %s not found", - ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), - service_name)); - - /* return ORTE_ERR_NOT_FOUND error code */ - ret = ORTE_ERR_NOT_FOUND; - if (ORTE_SUCCESS != (rc = opal_dss.pack(answer, &ret, 1, OPAL_INT))) { - ORTE_ERROR_LOG(rc); - /* if we can't pack it, we probably can't pack the - * rc value either, so just send whatever is there - */ - } - goto SEND_ANSWER; - } - - /* check to see if the sender owns it - must be exact match */ - if (OPAL_EQUAL != orte_util_compare_name_fields(ORTE_NS_CMP_ALL, - &data->owner, sender)) { - - OPAL_OUTPUT_VERBOSE((1, orte_debug_output, - "%s data server: service %s not owned by sender %s", - ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), - service_name, ORTE_NAME_PRINT(sender))); - - /* nope - return ORTE_ERR_PERM error code */ - ret = ORTE_ERR_PERM; - if (ORTE_SUCCESS != (rc = opal_dss.pack(answer, &ret, 1, OPAL_INT))) { - ORTE_ERROR_LOG(rc); - /* if we can't pack it, we probably can't pack the - * rc value either, so just send whatever is there - */ - } - goto SEND_ANSWER; - } - - /* delete the object from the data store */ - opal_pointer_array_set_item(orte_data_server_store, data->index, NULL); - OBJ_RELEASE(data); - - OPAL_OUTPUT_VERBOSE((1, orte_debug_output, - "%s data server: service %s unpublished", - ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), - service_name)); - - /* tell the sender this succeeded */ - ret = ORTE_SUCCESS; + /* return ORTE_ERR_NOT_FOUND error code */ + ret = ORTE_ERR_NOT_FOUND; if (ORTE_SUCCESS != (rc = opal_dss.pack(answer, &ret, 1, OPAL_INT))) { ORTE_ERROR_LOG(rc); /* if we can't pack it, we probably can't pack the @@ -378,21 +304,120 @@ void orte_data_server(int status, orte_process_name_t* sender, */ } goto SEND_ANSWER; - break; + } + + OPAL_OUTPUT_VERBOSE((1, orte_debug_output, + "%s data server: successful lookup on service %s port %s", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), + service_name, data->port)); + + /* pack success so the unpack on the other end can + * always unpack an int first + */ + ret = ORTE_SUCCESS; + if (ORTE_SUCCESS != (rc = opal_dss.pack(answer, &ret, 1, OPAL_INT))) { + ORTE_ERROR_LOG(rc); + /* if we can't pack it, we probably can't pack the + * rc value either, so just send whatever is there + */ + goto SEND_ANSWER; + } + + /* pack the returned port */ + if (ORTE_SUCCESS != (rc = opal_dss.pack(answer, &data->port, 1, OPAL_STRING))) { + ORTE_ERROR_LOG(rc); + /* if we can't pack it, we probably can't pack the + * rc value either, so just send whatever is there + */ + } + goto SEND_ANSWER; + break; - default: - ORTE_ERROR_LOG(ORTE_ERR_BAD_PARAM); - rc = ORTE_ERR_BAD_PARAM; - break; + case ORTE_DATA_SERVER_UNPUBLISH: + /* unpack the service name */ + count = 1; + if (ORTE_SUCCESS != (rc = opal_dss.unpack(buffer, &service_name, &count, OPAL_STRING))) { + ORTE_ERROR_LOG(rc); + goto SEND_ERROR; + } + + OPAL_OUTPUT_VERBOSE((1, orte_debug_output, + "%s data server: unpublish on service %s", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), + service_name)); + + /* locate this record in the data store */ + if (NULL == (data = lookup(service_name))) { + + OPAL_OUTPUT_VERBOSE((1, orte_debug_output, + "%s data server: service %s not found", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), + service_name)); + + /* return ORTE_ERR_NOT_FOUND error code */ + ret = ORTE_ERR_NOT_FOUND; + if (ORTE_SUCCESS != (rc = opal_dss.pack(answer, &ret, 1, OPAL_INT))) { + ORTE_ERROR_LOG(rc); + /* if we can't pack it, we probably can't pack the + * rc value either, so just send whatever is there + */ + } + goto SEND_ANSWER; + } + + /* check to see if the sender owns it - must be exact match */ + if (OPAL_EQUAL != orte_util_compare_name_fields(ORTE_NS_CMP_ALL, + &data->owner, sender)) { + + OPAL_OUTPUT_VERBOSE((1, orte_debug_output, + "%s data server: service %s not owned by sender %s", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), + service_name, ORTE_NAME_PRINT(sender))); + + /* nope - return ORTE_ERR_PERM error code */ + ret = ORTE_ERR_PERM; + if (ORTE_SUCCESS != (rc = opal_dss.pack(answer, &ret, 1, OPAL_INT))) { + ORTE_ERROR_LOG(rc); + /* if we can't pack it, we probably can't pack the + * rc value either, so just send whatever is there + */ + } + goto SEND_ANSWER; + } + + /* delete the object from the data store */ + opal_pointer_array_set_item(orte_data_server_store, data->index, NULL); + OBJ_RELEASE(data); + + OPAL_OUTPUT_VERBOSE((1, orte_debug_output, + "%s data server: service %s unpublished", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), + service_name)); + + /* tell the sender this succeeded */ + ret = ORTE_SUCCESS; + if (ORTE_SUCCESS != (rc = opal_dss.pack(answer, &ret, 1, OPAL_INT))) { + ORTE_ERROR_LOG(rc); + /* if we can't pack it, we probably can't pack the + * rc value either, so just send whatever is there + */ + } + goto SEND_ANSWER; + break; + + default: + ORTE_ERROR_LOG(ORTE_ERR_BAD_PARAM); + rc = ORTE_ERR_BAD_PARAM; + break; } -SEND_ERROR: + SEND_ERROR: /* pack the error code */ if (ORTE_SUCCESS != (ret = opal_dss.pack(answer, &rc, 1, OPAL_INT))) { ORTE_ERROR_LOG(ret); } -SEND_ANSWER: + SEND_ANSWER: if (0 > (rc = orte_rml.send_buffer_nb(sender, answer, ORTE_RML_TAG_DATA_CLIENT, 0, rml_cbfunc, NULL))) { ORTE_ERROR_LOG(rc); OBJ_RELEASE(answer);