1
1
openmpi/orte/mca/gpr/proxy/gpr_proxy_subscribe.c

397 строки
13 KiB
C
Исходник Обычный вид История

/*
* Copyright (c) 2004-2005 The Trustees of Indiana University and Indiana
* University Research and Technology
* Corporation. All rights reserved.
* Copyright (c) 2004-2005 The University of Tennessee and The University
* of Tennessee Research Foundation. All rights
* reserved.
* Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
* University of Stuttgart. All rights reserved.
* Copyright (c) 2004-2005 The Regents of the University of California.
* All rights reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
*
* $HEADER$
*/
/** @file:
*
* The Open MPI general purpose registry - implementation.
*
*/
/*
* includes
*/
#include "orte_config.h"
#include "include/orte_constants.h"
#include "dss/dss.h"
#include "opal/util/output.h"
#include "opal/util/trace.h"
#include "util/proc_info.h"
#include "mca/ns/ns_types.h"
#include "mca/errmgr/errmgr.h"
#include "mca/oob/oob_types.h"
#include "mca/rml/rml.h"
#include "gpr_proxy.h"
int
orte_gpr_proxy_subscribe(size_t num_subs,
orte_gpr_subscription_t **subscriptions,
size_t num_trigs,
orte_gpr_trigger_t **trigs)
{
orte_buffer_t *cmd;
orte_buffer_t *answer;
orte_gpr_proxy_subscriber_t **subs;
int rc = ORTE_SUCCESS, ret;
size_t i;
OPAL_TRACE(1);
/* need to protect against errors */
if (NULL == subscriptions && NULL == trigs) { /* need at least one */
ORTE_ERROR_LOG(ORTE_ERR_BAD_PARAM);
return ORTE_ERR_BAD_PARAM;
}
OPAL_THREAD_LOCK(&orte_gpr_proxy_globals.mutex);
/* store callback function and user_tag in local list for lookup
* generate id_tag to send to replica to identify lookup entry
* for each subscription
*/
if (NULL != subscriptions) {
if (ORTE_SUCCESS != (rc = orte_gpr_proxy_enter_subscription(
num_subs, subscriptions))) {
ORTE_ERROR_LOG(rc);
OPAL_THREAD_UNLOCK(&orte_gpr_proxy_globals.mutex);
return rc;
}
}
/* if any triggers were provided, get id tags for them */
if (NULL != trigs) {
if (ORTE_SUCCESS != (rc = orte_gpr_proxy_enter_trigger(
num_trigs, trigs))) {
ORTE_ERROR_LOG(rc);
OPAL_THREAD_UNLOCK(&orte_gpr_proxy_globals.mutex);
return rc;
}
}
/* check for compound cmd mode - if on, just pack the info into the
* compound cmd buffer and return
*/
if (orte_gpr_proxy_globals.compound_cmd_mode) {
if (ORTE_SUCCESS != (rc = orte_gpr_base_pack_subscribe(orte_gpr_proxy_globals.compound_cmd,
num_subs, subscriptions,
num_trigs, trigs))) {
ORTE_ERROR_LOG(rc);
goto subscribe_error;
}
/* done */
OPAL_THREAD_UNLOCK(&orte_gpr_proxy_globals.mutex);
return ORTE_SUCCESS;
}
/* if compound cmd not on, get new buffer to transmit command to replica */
cmd = OBJ_NEW(orte_buffer_t);
if (NULL == cmd) {
ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE);
rc = ORTE_ERR_OUT_OF_RESOURCE;
goto subscribe_error;
}
/* pack the command and send it */
if (ORTE_SUCCESS != (rc = orte_gpr_base_pack_subscribe(cmd,
num_subs, subscriptions,
num_trigs, trigs))) {
ORTE_ERROR_LOG(rc);
OBJ_RELEASE(cmd);
goto subscribe_error;
}
if (0 > orte_rml.send_buffer(orte_process_info.gpr_replica, cmd, ORTE_RML_TAG_GPR, 0)) {
ORTE_ERROR_LOG(ORTE_ERR_COMM_FAILURE);
OBJ_RELEASE(cmd);
rc = ORTE_ERR_COMM_FAILURE;
goto subscribe_error;
}
OBJ_RELEASE(cmd);
/* get buffer for reply from replica and get it */
answer = OBJ_NEW(orte_buffer_t);
if (NULL == answer) {
ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE);
rc = ORTE_ERR_OUT_OF_RESOURCE;
goto subscribe_error;
}
if (0 > orte_rml.recv_buffer(orte_process_info.gpr_replica, answer, ORTE_RML_TAG_GPR)) {
ORTE_ERROR_LOG(ORTE_ERR_COMM_FAILURE);
OBJ_RELEASE(answer);
rc = ORTE_ERR_COMM_FAILURE;
goto subscribe_error;
}
/* unpack the reply - should contain an echo of the subscribe command
* (to ensure the handshake) and the status code of the request. The
* unpack function checks the command for us - will return an error
* if the command was wrong - so all we have to do is record an
* error if the unpack command doesn't return "success", and return
* the resulting status code
*/
if (ORTE_SUCCESS != (rc = orte_gpr_base_unpack_subscribe(answer, &ret))) {
ORTE_ERROR_LOG(rc);
OBJ_RELEASE(answer);
goto subscribe_error;
}
OBJ_RELEASE(answer);
OPAL_THREAD_UNLOCK(&orte_gpr_proxy_globals.mutex);
return ORTE_SUCCESS;
/* if an error was encountered during processing this request, we need to
* remove the subscriptions from the subscription tracking system. do this
* and then exit.
* NOTE: there is no corresponding function to remove triggers from the local
* trigger tracking system as nothing is stored on it - we just keep track
* of how many triggers were generated so we can identify them, and the
* numbers are NOT re-used.
*/
subscribe_error:
subs = (orte_gpr_proxy_subscriber_t**)(orte_gpr_proxy_globals.subscriptions)->addr;
for (i=0; i < num_subs; i++) {
/* find the subscription on the local tracker */
if (ORTE_SUCCESS != (rc = orte_gpr_proxy_remove_subscription(subs[subscriptions[i]->id]))) {
ORTE_ERROR_LOG(rc);
OPAL_THREAD_UNLOCK(&orte_gpr_proxy_globals.mutex);
return rc;
}
}
OPAL_THREAD_UNLOCK(&orte_gpr_proxy_globals.mutex);
return rc;
}
int orte_gpr_proxy_unsubscribe(orte_gpr_subscription_id_t sub_number)
{
orte_buffer_t *cmd;
orte_buffer_t *answer;
orte_gpr_proxy_subscriber_t **subs;
size_t i, j;
int rc, ret;
OPAL_TRACE(1);
OPAL_THREAD_LOCK(&orte_gpr_proxy_globals.mutex);
/* remove the specified subscription from the local tracker */
subs = (orte_gpr_proxy_subscriber_t**)(orte_gpr_proxy_globals.subscriptions)->addr;
for (i=0, j=0; j < orte_gpr_proxy_globals.num_subs &&
i < (orte_gpr_proxy_globals.subscriptions)->size; i++) {
if (NULL != subs[i]){
j++;
if (sub_number == subs[i]->id) {
if (ORTE_SUCCESS != (rc = orte_gpr_proxy_remove_subscription(subs[i]))) {
ORTE_ERROR_LOG(rc);
OPAL_THREAD_UNLOCK(&orte_gpr_proxy_globals.mutex);
return rc;
}
goto PROCESS;
}
}
}
/* must not have been found - report error */
ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND);
return ORTE_ERR_NOT_FOUND;
PROCESS:
/* if in compound cmd mode, then just pack the command into
* that buffer and return
*/
if (orte_gpr_proxy_globals.compound_cmd_mode) {
if (ORTE_SUCCESS != (rc = orte_gpr_base_pack_unsubscribe(orte_gpr_proxy_globals.compound_cmd,
sub_number))) {
ORTE_ERROR_LOG(rc);
}
OPAL_THREAD_UNLOCK(&orte_gpr_proxy_globals.mutex);
return rc;
}
/* if not in compound cmd mode, then init a new buffer to
* transmit the command
*/
cmd = OBJ_NEW(orte_buffer_t);
if (NULL == cmd) {
ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE);
OPAL_THREAD_UNLOCK(&orte_gpr_proxy_globals.mutex);
return ORTE_ERR_OUT_OF_RESOURCE;
}
/* pack and transmit the command */
if (ORTE_SUCCESS != (rc = orte_gpr_base_pack_unsubscribe(cmd, sub_number))) {
ORTE_ERROR_LOG(rc);
OBJ_RELEASE(cmd);
OPAL_THREAD_UNLOCK(&orte_gpr_proxy_globals.mutex);
return rc;
}
if (0 > orte_rml.send_buffer(orte_process_info.gpr_replica, cmd, ORTE_RML_TAG_GPR, 0)) {
ORTE_ERROR_LOG(ORTE_ERR_COMM_FAILURE);
OBJ_RELEASE(cmd);
OPAL_THREAD_UNLOCK(&orte_gpr_proxy_globals.mutex);
return ORTE_ERR_COMM_FAILURE;
}
OBJ_RELEASE(cmd);
/* init a buffer to receive the replica's reply */
answer = OBJ_NEW(orte_buffer_t);
if (NULL == answer) {
ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE);
OPAL_THREAD_UNLOCK(&orte_gpr_proxy_globals.mutex);
return ORTE_ERR_OUT_OF_RESOURCE;
}
if (0 > orte_rml.recv_buffer(orte_process_info.gpr_replica, answer, ORTE_RML_TAG_GPR)) {
ORTE_ERROR_LOG(ORTE_ERR_COMM_FAILURE);
OBJ_RELEASE(answer);
OPAL_THREAD_UNLOCK(&orte_gpr_proxy_globals.mutex);
return ORTE_ERR_COMM_FAILURE;
}
/* unpack the response. This function will automatically check to ensure
* that the command in the response matches the unsubscribe command, thus
* verifying the handshake. If the function returns "success", then the
* commands match and the buffer could be unpacked - all we need do, then
* is return the replica's response code
*/
if (ORTE_SUCCESS != (rc = orte_gpr_base_unpack_unsubscribe(answer, &ret))) {
ORTE_ERROR_LOG(rc);
OBJ_RELEASE(answer);
OPAL_THREAD_UNLOCK(&orte_gpr_proxy_globals.mutex);
return rc;
}
OBJ_RELEASE(answer);
OPAL_THREAD_UNLOCK(&orte_gpr_proxy_globals.mutex);
return ret;
}
int orte_gpr_proxy_cancel_trigger(orte_gpr_trigger_id_t trig)
{
orte_buffer_t *cmd;
orte_buffer_t *answer;
orte_gpr_proxy_trigger_t **trigs;
size_t i, j;
int rc, ret;
OPAL_TRACE(1);
OPAL_THREAD_LOCK(&orte_gpr_proxy_globals.mutex);
First phase of the scalable RTE changes: 1. Modify the registry to eliminate redundant data copying for startup messages. 2. Revise the subscription/trigger system to avoid redundant storage of triggers and subscriptions. This dramatically reduces the search time when a registry action occurs - to illustrate the point, there are now only a handful of triggers on the system for each job. Before, there were a handful of triggers for each PROCESS in the job, all of which had to be checked every time something happened on the registry. This is much, much faster now. 3. Update all subscriptions to the new format. There are now "named" subscriptions - this allows you to "name" a subscription that all the processes will be using. The first one to hit the registry actually defines the subscription. From then on, any subsequent "subscribes" to the same name just cause that process to "attach" to the existing subscription. This keeps the number of subscriptions being tracked by the registry to a minimum, while ensuring that each process still gets notified. 4. Do the same for triggers. Also fixed a duplicate subscription problem that was causing people to receive data equal to the number of processes times the data they should have received from a trigger/subscription. Sorry about that... :-( ...but it's all better now! Uncovered a situation where the modex data seems to be getting entered on the registry a second time - the latter time coming after the compound command has been "fired", thereby causing all the subscriptions to fire. Asked Tim and Jeff to look into this. Second phase of the changes will involve modifying the xcast system so that the same message gets sent to all processes. This will further reduce the message traffic, and - once we have a true "broadcast" version of xcast - really speed things up and improve scalability. This commit was SVN r6542.
2005-07-18 18:49:00 +00:00
/* remove the specified trigger from the local tracker */
trigs = (orte_gpr_proxy_trigger_t**)(orte_gpr_proxy_globals.triggers)->addr;
for (i=0, j=0; j < orte_gpr_proxy_globals.num_trigs &&
i < (orte_gpr_proxy_globals.triggers)->size; i++) {
if (NULL != trigs[i]){
j++;
if (trig == trigs[i]->id) {
if (ORTE_SUCCESS != (rc = orte_gpr_proxy_remove_trigger(trigs[i]))) {
ORTE_ERROR_LOG(rc);
OPAL_THREAD_UNLOCK(&orte_gpr_proxy_globals.mutex);
return rc;
}
goto PROCESS;
}
}
First phase of the scalable RTE changes: 1. Modify the registry to eliminate redundant data copying for startup messages. 2. Revise the subscription/trigger system to avoid redundant storage of triggers and subscriptions. This dramatically reduces the search time when a registry action occurs - to illustrate the point, there are now only a handful of triggers on the system for each job. Before, there were a handful of triggers for each PROCESS in the job, all of which had to be checked every time something happened on the registry. This is much, much faster now. 3. Update all subscriptions to the new format. There are now "named" subscriptions - this allows you to "name" a subscription that all the processes will be using. The first one to hit the registry actually defines the subscription. From then on, any subsequent "subscribes" to the same name just cause that process to "attach" to the existing subscription. This keeps the number of subscriptions being tracked by the registry to a minimum, while ensuring that each process still gets notified. 4. Do the same for triggers. Also fixed a duplicate subscription problem that was causing people to receive data equal to the number of processes times the data they should have received from a trigger/subscription. Sorry about that... :-( ...but it's all better now! Uncovered a situation where the modex data seems to be getting entered on the registry a second time - the latter time coming after the compound command has been "fired", thereby causing all the subscriptions to fire. Asked Tim and Jeff to look into this. Second phase of the changes will involve modifying the xcast system so that the same message gets sent to all processes. This will further reduce the message traffic, and - once we have a true "broadcast" version of xcast - really speed things up and improve scalability. This commit was SVN r6542.
2005-07-18 18:49:00 +00:00
}
/* must not have been found - report error */
ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND);
return ORTE_ERR_NOT_FOUND;
PROCESS:
/* if the compound cmd mode is on, pack the command into that buffer
* and return
*/
if (orte_gpr_proxy_globals.compound_cmd_mode) {
if (ORTE_SUCCESS != (rc = orte_gpr_base_pack_cancel_trigger(
orte_gpr_proxy_globals.compound_cmd, trig))) {
ORTE_ERROR_LOG(rc);
}
OPAL_THREAD_UNLOCK(&orte_gpr_proxy_globals.mutex);
return rc;
}
/* if compound cmd mode is off, init a new buffer for transmitting the
* command to the replica
*/
cmd = OBJ_NEW(orte_buffer_t);
if (NULL == cmd) {
ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE);
OPAL_THREAD_UNLOCK(&orte_gpr_proxy_globals.mutex);
return ORTE_ERR_OUT_OF_RESOURCE;
}
/* pack the trigger number and transmit the command */
if (ORTE_SUCCESS != (rc = orte_gpr_base_pack_cancel_trigger(cmd, trig))) {
ORTE_ERROR_LOG(rc);
OBJ_RELEASE(cmd);
OPAL_THREAD_UNLOCK(&orte_gpr_proxy_globals.mutex);
return rc;
}
if (0 > orte_rml.send_buffer(orte_process_info.gpr_replica, cmd, ORTE_RML_TAG_GPR, 0)) {
ORTE_ERROR_LOG(ORTE_ERR_COMM_FAILURE);
OBJ_RELEASE(cmd);
OPAL_THREAD_UNLOCK(&orte_gpr_proxy_globals.mutex);
return ORTE_ERR_COMM_FAILURE;
}
OBJ_RELEASE(cmd);
/* init a buffer to receive the replica's response */
answer = OBJ_NEW(orte_buffer_t);
if (NULL == answer) {
ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE);
OPAL_THREAD_UNLOCK(&orte_gpr_proxy_globals.mutex);
return ORTE_ERR_OUT_OF_RESOURCE;
}
if (0 > orte_rml.recv_buffer(orte_process_info.gpr_replica, answer, ORTE_RML_TAG_GPR)) {
ORTE_ERROR_LOG(ORTE_ERR_COMM_FAILURE);
OBJ_RELEASE(answer);
OPAL_THREAD_UNLOCK(&orte_gpr_proxy_globals.mutex);
return ORTE_ERR_COMM_FAILURE;
}
/* unpack the response. This function will automatically check to ensure
* that the command in the response matches the cancel_trigger command, thus
* verifying the handshake. If the function returns "success", then the
* commands match and the buffer could be unpacked - all we need do, then
* is return the replica's response code
*/
if (ORTE_SUCCESS != (rc = orte_gpr_base_unpack_unsubscribe(answer, &ret))) {
ORTE_ERROR_LOG(rc);
OBJ_RELEASE(answer);
OPAL_THREAD_UNLOCK(&orte_gpr_proxy_globals.mutex);
return rc;
}
OBJ_RELEASE(answer);
OPAL_THREAD_UNLOCK(&orte_gpr_proxy_globals.mutex);
return ret;
}