1
1

Add new info key for publish that allows user to designate that the port is to be unique - i.e., to return an error if that service has already been published. Default is to overwrite

This commit was SVN r29028.
Этот коммит содержится в:
Ralph Castain 2013-08-14 04:21:17 +00:00
родитель 72b5e867ab
Коммит bebe852057
2 изменённых файлов: 213 добавлений и 175 удалений

Просмотреть файл

@ -132,6 +132,7 @@ static int publish ( char *service_name, ompi_info_t *info, char *port_name )
opal_buffer_t buf; opal_buffer_t buf;
orte_data_server_cmd_t cmd=ORTE_DATA_SERVER_PUBLISH; orte_data_server_cmd_t cmd=ORTE_DATA_SERVER_PUBLISH;
orte_std_cntr_t cnt; orte_std_cntr_t cnt;
bool unique=false;
ompi_info_get_bool(info, "ompi_global_scope", &global_scope, &flag); ompi_info_get_bool(info, "ompi_global_scope", &global_scope, &flag);
@ -172,6 +173,12 @@ static int publish ( char *service_name, ompi_info_t *info, char *port_name )
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
service_name, global_scope ? "Global" : "Local")); service_name, global_scope ? "Global" : "Local"));
ompi_info_get_bool(info, "ompi_unique", &unique, &flag);
if (0 == flag) {
/* uniqueness not specified - overwrite by default */
unique = false;
}
/* construct the buffer */ /* construct the buffer */
OBJ_CONSTRUCT(&buf, opal_buffer_t); OBJ_CONSTRUCT(&buf, opal_buffer_t);
@ -193,6 +200,12 @@ static int publish ( char *service_name, ompi_info_t *info, char *port_name )
goto CLEANUP; goto CLEANUP;
} }
/* pack the uniqueness flag */
if (OPAL_SUCCESS != (rc = opal_dss.pack(&buf, &unique, 1, OPAL_BOOL))) {
ORTE_ERROR_LOG(rc);
goto CLEANUP;
}
/* send the data */ /* send the data */
if (0 > (rc = orte_rml.send_buffer(info_host, &buf, ORTE_RML_TAG_DATA_SERVER, 0))) { if (0 > (rc = orte_rml.send_buffer(info_host, &buf, ORTE_RML_TAG_DATA_SERVER, 0))) {
ORTE_ERROR_LOG(rc); ORTE_ERROR_LOG(rc);

Просмотреть файл

@ -163,13 +163,14 @@ void orte_data_server(int status, orte_process_name_t* sender,
orte_data_object_t *data; orte_data_object_t *data;
opal_buffer_t *answer; opal_buffer_t *answer;
int rc, ret; int rc, ret;
count = 1; bool unique;
OPAL_OUTPUT_VERBOSE((1, orte_debug_output, OPAL_OUTPUT_VERBOSE((1, orte_debug_output,
"%s data server got message from %s", "%s data server got message from %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(sender))); ORTE_NAME_PRINT(sender)));
count = 1;
if (ORTE_SUCCESS != (rc = opal_dss.unpack(buffer, &command, &count, ORTE_DATA_SERVER_CMD))) { if (ORTE_SUCCESS != (rc = opal_dss.unpack(buffer, &command, &count, ORTE_DATA_SERVER_CMD))) {
ORTE_ERROR_LOG(rc); ORTE_ERROR_LOG(rc);
return; return;
@ -178,31 +179,41 @@ void orte_data_server(int status, orte_process_name_t* sender,
answer = OBJ_NEW(opal_buffer_t); answer = OBJ_NEW(opal_buffer_t);
switch(command) { switch(command) {
case ORTE_DATA_SERVER_PUBLISH: case ORTE_DATA_SERVER_PUBLISH:
/* unpack the service name */ /* unpack the service name */
count = 1; count = 1;
if (ORTE_SUCCESS != (rc = opal_dss.unpack(buffer, &service_name, &count, OPAL_STRING))) { if (ORTE_SUCCESS != (rc = opal_dss.unpack(buffer, &service_name, &count, OPAL_STRING))) {
ORTE_ERROR_LOG(rc); ORTE_ERROR_LOG(rc);
goto SEND_ERROR; goto SEND_ERROR;
} }
/* unpack the port name */ /* unpack the port name */
count = 1; count = 1;
if (ORTE_SUCCESS != (rc = opal_dss.unpack(buffer, &port_name, &count, OPAL_STRING))) { if (ORTE_SUCCESS != (rc = opal_dss.unpack(buffer, &port_name, &count, OPAL_STRING))) {
ORTE_ERROR_LOG(rc); ORTE_ERROR_LOG(rc);
goto SEND_ERROR; goto SEND_ERROR;
} }
OPAL_OUTPUT_VERBOSE((1, orte_debug_output, /* unpack uniqueness flag */
"%s data server: publishing service %s port %s", count = 1;
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), if (ORTE_SUCCESS != (rc = opal_dss.unpack(buffer, &unique, &count, OPAL_BOOL))) {
service_name, port_name)); ORTE_ERROR_LOG(rc);
goto SEND_ERROR;
}
/* check the current data store to see if this service name has already OPAL_OUTPUT_VERBOSE((1, orte_debug_output,
* been published "%s data server: publishing service %s port %s %s",
*/ ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
if (NULL != lookup(service_name)) { service_name, port_name,
/* already exists - return ORTE_EXISTS error code */ unique ? "UNIQUE" : "OVERWRITE"));
/* check the current data store to see if this service name has already
* been published
*/
if (NULL != (data = lookup(service_name))) {
/* already exists - see if overwrite allowed */
if (unique) {
/* return ORTE_EXISTS error code */
OPAL_OUTPUT_VERBOSE((1, orte_debug_output, OPAL_OUTPUT_VERBOSE((1, orte_debug_output,
"%s data server: publishing service %s port %s already exists", "%s data server: publishing service %s port %s already exists",
@ -210,36 +221,82 @@ void orte_data_server(int status, orte_process_name_t* sender,
service_name, port_name)); service_name, port_name));
ret = ORTE_EXISTS; ret = ORTE_EXISTS;
if (ORTE_SUCCESS != (rc = opal_dss.pack(answer, &ret, 1, OPAL_INT))) { } else {
ORTE_ERROR_LOG(rc); OPAL_OUTPUT_VERBOSE((1, orte_debug_output,
/* if we can't pack it, we probably can't pack the "%s data server: overwriting service %s with port %s",
* rc value either, so just send whatever is there ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
*/ service_name, port_name));
if (NULL != data->port) {
free(data->port);
} }
goto SEND_ANSWER; data->port = port_name;
data->owner.jobid = sender->jobid;
data->owner.vpid = sender->vpid;
ret = ORTE_SUCCESS;
} }
if (ORTE_SUCCESS != (rc = opal_dss.pack(answer, &ret, 1, OPAL_INT))) {
ORTE_ERROR_LOG(rc);
/* if we can't pack it, we probably can't pack the
* rc value either, so just send whatever is there
*/
}
goto SEND_ANSWER;
/* create a new data object */ }
data = OBJ_NEW(orte_data_object_t);
/* pass over the data values - these were malloc'd when unpacked, /* create a new data object */
* so we don't need to strdup them here data = OBJ_NEW(orte_data_object_t);
/* pass over the data values - these were malloc'd when unpacked,
* so we don't need to strdup them here
*/
data->service = service_name;
data->port = port_name;
data->owner.jobid = sender->jobid;
data->owner.vpid = sender->vpid;
/* store the data */
data->index = opal_pointer_array_add(orte_data_server_store, data);
OPAL_OUTPUT_VERBOSE((1, orte_debug_output,
"%s data server: successfully published service %s port %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
service_name, port_name));
/* tell the user it was wonderful... */
ret = ORTE_SUCCESS;
if (ORTE_SUCCESS != (rc = opal_dss.pack(answer, &ret, 1, OPAL_INT))) {
ORTE_ERROR_LOG(rc);
/* if we can't pack it, we probably can't pack the
* rc value either, so just send whatever is there
*/ */
data->service = service_name; }
data->port = port_name; goto SEND_ANSWER;
data->owner.jobid = sender->jobid; break;
data->owner.vpid = sender->vpid;
/* store the data */ case ORTE_DATA_SERVER_LOOKUP:
data->index = opal_pointer_array_add(orte_data_server_store, data); /* unpack the service name */
count = 1;
if (ORTE_SUCCESS != (rc = opal_dss.unpack(buffer, &service_name, &count, OPAL_STRING))) {
ORTE_ERROR_LOG(rc);
goto SEND_ERROR;
}
OPAL_OUTPUT_VERBOSE((1, orte_debug_output,
"%s data server: lookup on service %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
service_name));
/* locate this record in the data store */
if (NULL == (data = lookup(service_name))) {
OPAL_OUTPUT_VERBOSE((1, orte_debug_output, OPAL_OUTPUT_VERBOSE((1, orte_debug_output,
"%s data server: successfully published service %s port %s", "%s data server: service %s not found",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
service_name, port_name)); service_name));
/* tell the user it was wonderful... */ /* return ORTE_ERR_NOT_FOUND error code */
ret = ORTE_SUCCESS; ret = ORTE_ERR_NOT_FOUND;
if (ORTE_SUCCESS != (rc = opal_dss.pack(answer, &ret, 1, OPAL_INT))) { if (ORTE_SUCCESS != (rc = opal_dss.pack(answer, &ret, 1, OPAL_INT))) {
ORTE_ERROR_LOG(rc); ORTE_ERROR_LOG(rc);
/* if we can't pack it, we probably can't pack the /* if we can't pack it, we probably can't pack the
@ -247,130 +304,58 @@ void orte_data_server(int status, orte_process_name_t* sender,
*/ */
} }
goto SEND_ANSWER; goto SEND_ANSWER;
break; }
case ORTE_DATA_SERVER_LOOKUP: OPAL_OUTPUT_VERBOSE((1, orte_debug_output,
/* unpack the service name */ "%s data server: successful lookup on service %s port %s",
count = 1; ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
if (ORTE_SUCCESS != (rc = opal_dss.unpack(buffer, &service_name, &count, OPAL_STRING))) { service_name, data->port));
ORTE_ERROR_LOG(rc);
goto SEND_ERROR;
}
OPAL_OUTPUT_VERBOSE((1, orte_debug_output, /* pack success so the unpack on the other end can
"%s data server: lookup on service %s", * always unpack an int first
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), */
service_name)); ret = ORTE_SUCCESS;
if (ORTE_SUCCESS != (rc = opal_dss.pack(answer, &ret, 1, OPAL_INT))) {
/* locate this record in the data store */ ORTE_ERROR_LOG(rc);
if (NULL == (data = lookup(service_name))) { /* if we can't pack it, we probably can't pack the
* rc value either, so just send whatever is there
OPAL_OUTPUT_VERBOSE((1, orte_debug_output,
"%s data server: service %s not found",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
service_name));
/* return ORTE_ERR_NOT_FOUND error code */
ret = ORTE_ERR_NOT_FOUND;
if (ORTE_SUCCESS != (rc = opal_dss.pack(answer, &ret, 1, OPAL_INT))) {
ORTE_ERROR_LOG(rc);
/* if we can't pack it, we probably can't pack the
* rc value either, so just send whatever is there
*/
}
goto SEND_ANSWER;
}
OPAL_OUTPUT_VERBOSE((1, orte_debug_output,
"%s data server: successful lookup on service %s port %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
service_name, data->port));
/* pack success so the unpack on the other end can
* always unpack an int first
*/ */
ret = ORTE_SUCCESS;
if (ORTE_SUCCESS != (rc = opal_dss.pack(answer, &ret, 1, OPAL_INT))) {
ORTE_ERROR_LOG(rc);
/* if we can't pack it, we probably can't pack the
* rc value either, so just send whatever is there
*/
goto SEND_ANSWER;
}
/* pack the returned port */
if (ORTE_SUCCESS != (rc = opal_dss.pack(answer, &data->port, 1, OPAL_STRING))) {
ORTE_ERROR_LOG(rc);
/* if we can't pack it, we probably can't pack the
* rc value either, so just send whatever is there
*/
}
goto SEND_ANSWER; goto SEND_ANSWER;
break; }
case ORTE_DATA_SERVER_UNPUBLISH: /* pack the returned port */
/* unpack the service name */ if (ORTE_SUCCESS != (rc = opal_dss.pack(answer, &data->port, 1, OPAL_STRING))) {
count = 1; ORTE_ERROR_LOG(rc);
if (ORTE_SUCCESS != (rc = opal_dss.unpack(buffer, &service_name, &count, OPAL_STRING))) { /* if we can't pack it, we probably can't pack the
ORTE_ERROR_LOG(rc); * rc value either, so just send whatever is there
goto SEND_ERROR; */
} }
goto SEND_ANSWER;
break;
case ORTE_DATA_SERVER_UNPUBLISH:
/* unpack the service name */
count = 1;
if (ORTE_SUCCESS != (rc = opal_dss.unpack(buffer, &service_name, &count, OPAL_STRING))) {
ORTE_ERROR_LOG(rc);
goto SEND_ERROR;
}
OPAL_OUTPUT_VERBOSE((1, orte_debug_output,
"%s data server: unpublish on service %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
service_name));
/* locate this record in the data store */
if (NULL == (data = lookup(service_name))) {
OPAL_OUTPUT_VERBOSE((1, orte_debug_output, OPAL_OUTPUT_VERBOSE((1, orte_debug_output,
"%s data server: unpublish on service %s", "%s data server: service %s not found",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
service_name)); service_name));
/* locate this record in the data store */ /* return ORTE_ERR_NOT_FOUND error code */
if (NULL == (data = lookup(service_name))) { ret = ORTE_ERR_NOT_FOUND;
OPAL_OUTPUT_VERBOSE((1, orte_debug_output,
"%s data server: service %s not found",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
service_name));
/* return ORTE_ERR_NOT_FOUND error code */
ret = ORTE_ERR_NOT_FOUND;
if (ORTE_SUCCESS != (rc = opal_dss.pack(answer, &ret, 1, OPAL_INT))) {
ORTE_ERROR_LOG(rc);
/* if we can't pack it, we probably can't pack the
* rc value either, so just send whatever is there
*/
}
goto SEND_ANSWER;
}
/* check to see if the sender owns it - must be exact match */
if (OPAL_EQUAL != orte_util_compare_name_fields(ORTE_NS_CMP_ALL,
&data->owner, sender)) {
OPAL_OUTPUT_VERBOSE((1, orte_debug_output,
"%s data server: service %s not owned by sender %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
service_name, ORTE_NAME_PRINT(sender)));
/* nope - return ORTE_ERR_PERM error code */
ret = ORTE_ERR_PERM;
if (ORTE_SUCCESS != (rc = opal_dss.pack(answer, &ret, 1, OPAL_INT))) {
ORTE_ERROR_LOG(rc);
/* if we can't pack it, we probably can't pack the
* rc value either, so just send whatever is there
*/
}
goto SEND_ANSWER;
}
/* delete the object from the data store */
opal_pointer_array_set_item(orte_data_server_store, data->index, NULL);
OBJ_RELEASE(data);
OPAL_OUTPUT_VERBOSE((1, orte_debug_output,
"%s data server: service %s unpublished",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
service_name));
/* tell the sender this succeeded */
ret = ORTE_SUCCESS;
if (ORTE_SUCCESS != (rc = opal_dss.pack(answer, &ret, 1, OPAL_INT))) { if (ORTE_SUCCESS != (rc = opal_dss.pack(answer, &ret, 1, OPAL_INT))) {
ORTE_ERROR_LOG(rc); ORTE_ERROR_LOG(rc);
/* if we can't pack it, we probably can't pack the /* if we can't pack it, we probably can't pack the
@ -378,21 +363,61 @@ void orte_data_server(int status, orte_process_name_t* sender,
*/ */
} }
goto SEND_ANSWER; goto SEND_ANSWER;
break; }
default: /* check to see if the sender owns it - must be exact match */
ORTE_ERROR_LOG(ORTE_ERR_BAD_PARAM); if (OPAL_EQUAL != orte_util_compare_name_fields(ORTE_NS_CMP_ALL,
rc = ORTE_ERR_BAD_PARAM; &data->owner, sender)) {
break;
OPAL_OUTPUT_VERBOSE((1, orte_debug_output,
"%s data server: service %s not owned by sender %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
service_name, ORTE_NAME_PRINT(sender)));
/* nope - return ORTE_ERR_PERM error code */
ret = ORTE_ERR_PERM;
if (ORTE_SUCCESS != (rc = opal_dss.pack(answer, &ret, 1, OPAL_INT))) {
ORTE_ERROR_LOG(rc);
/* if we can't pack it, we probably can't pack the
* rc value either, so just send whatever is there
*/
}
goto SEND_ANSWER;
}
/* delete the object from the data store */
opal_pointer_array_set_item(orte_data_server_store, data->index, NULL);
OBJ_RELEASE(data);
OPAL_OUTPUT_VERBOSE((1, orte_debug_output,
"%s data server: service %s unpublished",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
service_name));
/* tell the sender this succeeded */
ret = ORTE_SUCCESS;
if (ORTE_SUCCESS != (rc = opal_dss.pack(answer, &ret, 1, OPAL_INT))) {
ORTE_ERROR_LOG(rc);
/* if we can't pack it, we probably can't pack the
* rc value either, so just send whatever is there
*/
}
goto SEND_ANSWER;
break;
default:
ORTE_ERROR_LOG(ORTE_ERR_BAD_PARAM);
rc = ORTE_ERR_BAD_PARAM;
break;
} }
SEND_ERROR: SEND_ERROR:
/* pack the error code */ /* pack the error code */
if (ORTE_SUCCESS != (ret = opal_dss.pack(answer, &rc, 1, OPAL_INT))) { if (ORTE_SUCCESS != (ret = opal_dss.pack(answer, &rc, 1, OPAL_INT))) {
ORTE_ERROR_LOG(ret); ORTE_ERROR_LOG(ret);
} }
SEND_ANSWER: SEND_ANSWER:
if (0 > (rc = orte_rml.send_buffer_nb(sender, answer, ORTE_RML_TAG_DATA_CLIENT, 0, rml_cbfunc, NULL))) { if (0 > (rc = orte_rml.send_buffer_nb(sender, answer, ORTE_RML_TAG_DATA_CLIENT, 0, rml_cbfunc, NULL))) {
ORTE_ERROR_LOG(rc); ORTE_ERROR_LOG(rc);
OBJ_RELEASE(answer); OBJ_RELEASE(answer);