ka-ching
This commit was SVN r2830.
Этот коммит содержится в:
родитель
fa205d9129
Коммит
92a5f2da1f
@ -25,6 +25,8 @@
|
||||
|
||||
#include "util/output.h"
|
||||
#include "util/proc_info.h"
|
||||
#include "util/sys_info.h"
|
||||
|
||||
#include "mca/gpr/base/base.h"
|
||||
#include "gpr_replica.h"
|
||||
#include "gpr_replica_internals.h"
|
||||
@ -409,20 +411,25 @@ int gpr_replica_subscribe(ompi_registry_mode_t addr_mode,
|
||||
ompi_registry_notify_cb_fn_t cb_func, void *user_tag)
|
||||
{
|
||||
int rc;
|
||||
mca_gpr_notify_id_t local_idtag;
|
||||
|
||||
OMPI_THREAD_LOCK(&mca_gpr_replica_mutex);
|
||||
rc = gpr_replica_subscribe_nl(addr_mode,action,segment,tokens,cb_func,user_tag);
|
||||
|
||||
/* enter request on notify tracking system */
|
||||
local_idtag = gpr_replica_enter_notify_request(NULL, 0, cb_func, user_tag);
|
||||
|
||||
/* process subscription */
|
||||
rc = gpr_replica_subscribe_nl(addr_mode,action,segment,tokens,local_idtag);
|
||||
|
||||
OMPI_THREAD_UNLOCK(&mca_gpr_replica_mutex);
|
||||
return rc;
|
||||
}
|
||||
|
||||
|
||||
int gpr_replica_subscribe_nl(ompi_registry_mode_t addr_mode,
|
||||
ompi_registry_notify_action_t action,
|
||||
char *segment, char **tokens,
|
||||
ompi_registry_notify_cb_fn_t cb_func, void *user_tag)
|
||||
ompi_registry_notify_action_t action,
|
||||
char *segment, char **tokens, mca_gpr_notify_id_t id_tag)
|
||||
{
|
||||
mca_gpr_notify_request_tracker_t *trackptr;
|
||||
mca_gpr_idtag_list_t *ptr_free_id;
|
||||
mca_gpr_replica_trigger_list_t *trig;
|
||||
ompi_registry_notify_message_t *notify_msg;
|
||||
|
||||
@ -438,25 +445,10 @@ int gpr_replica_subscribe_nl(ompi_registry_mode_t addr_mode,
|
||||
}
|
||||
|
||||
|
||||
/* enter request on notify tracking system */
|
||||
trackptr = OBJ_NEW(mca_gpr_notify_request_tracker_t);
|
||||
trackptr->requestor = NULL;
|
||||
trackptr->req_tag = 0;
|
||||
trackptr->callback = cb_func;
|
||||
trackptr->user_tag = user_tag;
|
||||
if (ompi_list_is_empty(&mca_gpr_replica_free_notify_id_tags)) {
|
||||
trackptr->id_tag = mca_gpr_replica_last_notify_id_tag;
|
||||
mca_gpr_replica_last_notify_id_tag++;
|
||||
} else {
|
||||
ptr_free_id = (mca_gpr_idtag_list_t*)ompi_list_remove_first(&mca_gpr_replica_free_notify_id_tags);
|
||||
trackptr->id_tag = ptr_free_id->id_tag;
|
||||
}
|
||||
|
||||
/* construct the trigger - add to notify tracking system if success, otherwise dump */
|
||||
/* construct the trigger - add to notify tracking system if success, otherwise dump */
|
||||
if (NULL != (trig = gpr_replica_construct_trigger(OMPI_REGISTRY_SYNCHRO_MODE_NONE, action,
|
||||
addr_mode, segment, tokens,
|
||||
0, trackptr->id_tag))) {
|
||||
ompi_list_append(&mca_gpr_replica_notify_request_tracker, &trackptr->item);
|
||||
0, id_tag))) {
|
||||
|
||||
if (OMPI_REGISTRY_NOTIFY_PRE_EXISTING & action) { /* want list of everything there */
|
||||
notify_msg = gpr_replica_construct_notify_message(addr_mode, segment, trig->tokens);
|
||||
@ -466,7 +458,6 @@ int gpr_replica_subscribe_nl(ompi_registry_mode_t addr_mode,
|
||||
}
|
||||
return OMPI_SUCCESS;
|
||||
} else {
|
||||
OBJ_RELEASE(trackptr);
|
||||
return OMPI_ERROR;
|
||||
}
|
||||
}
|
||||
@ -480,16 +471,20 @@ int gpr_replica_unsubscribe(ompi_registry_mode_t addr_mode,
|
||||
OMPI_THREAD_LOCK(&mca_gpr_replica_mutex);
|
||||
rc = gpr_replica_unsubscribe_nl(addr_mode,action,segment,tokens);
|
||||
OMPI_THREAD_UNLOCK(&mca_gpr_replica_mutex);
|
||||
return rc;
|
||||
|
||||
if (MCA_GPR_NOTIFY_ID_MAX == rc) {
|
||||
return OMPI_ERROR;
|
||||
}
|
||||
|
||||
return OMPI_SUCCESS;
|
||||
}
|
||||
|
||||
|
||||
int gpr_replica_unsubscribe_nl(ompi_registry_mode_t addr_mode,
|
||||
ompi_registry_notify_action_t action,
|
||||
char *segment, char **tokens)
|
||||
mca_gpr_notify_id_t gpr_replica_unsubscribe_nl(ompi_registry_mode_t addr_mode,
|
||||
ompi_registry_notify_action_t action,
|
||||
char *segment, char **tokens)
|
||||
{
|
||||
mca_gpr_notify_request_tracker_t *trackptr;
|
||||
mca_gpr_notify_id_t id_tag;
|
||||
mca_gpr_notify_id_t id_tag, req_idtag;
|
||||
|
||||
if (mca_gpr_replica_debug) {
|
||||
ompi_output(0, "[%d,%d,%d] gpr replica: unsubscribe entered: segment %s 1st token %s",
|
||||
@ -499,32 +494,22 @@ int gpr_replica_unsubscribe_nl(ompi_registry_mode_t addr_mode,
|
||||
|
||||
/* protect against errors */
|
||||
if (NULL == segment) {
|
||||
return OMPI_ERROR;
|
||||
return MCA_GPR_NOTIFY_ID_MAX;
|
||||
}
|
||||
|
||||
|
||||
/* find trigger on replica - return id_tag */
|
||||
if (MCA_GPR_NOTIFY_ID_MAX == (id_tag = gpr_replica_remove_trigger(OMPI_REGISTRY_SYNCHRO_MODE_NONE, action,
|
||||
addr_mode, segment, tokens, 0))) {
|
||||
return OMPI_ERROR;
|
||||
id_tag = gpr_replica_remove_trigger(OMPI_REGISTRY_SYNCHRO_MODE_NONE, action,
|
||||
addr_mode, segment, tokens, 0);
|
||||
|
||||
if (MCA_GPR_NOTIFY_ID_MAX != id_tag) { /* removed trigger successfully */
|
||||
|
||||
req_idtag = gpr_replica_remove_notify_request(id_tag);
|
||||
return req_idtag;
|
||||
} else {
|
||||
return MCA_GPR_NOTIFY_ID_MAX;
|
||||
}
|
||||
|
||||
/* find request on notify tracking system */
|
||||
for (trackptr = (mca_gpr_notify_request_tracker_t*)ompi_list_get_first(&mca_gpr_replica_notify_request_tracker);
|
||||
trackptr != (mca_gpr_notify_request_tracker_t*)ompi_list_get_end(&mca_gpr_replica_notify_request_tracker) &&
|
||||
trackptr->id_tag != id_tag;
|
||||
trackptr = (mca_gpr_notify_request_tracker_t*)ompi_list_get_next(trackptr));
|
||||
|
||||
/* ...and remove it */
|
||||
if (trackptr != (mca_gpr_notify_request_tracker_t*)ompi_list_get_end(&mca_gpr_replica_notify_request_tracker)) {
|
||||
ompi_list_remove_item(&mca_gpr_replica_notify_request_tracker, &trackptr->item);
|
||||
OBJ_RELEASE(trackptr);
|
||||
|
||||
return OMPI_SUCCESS;
|
||||
}
|
||||
|
||||
/* if we get here, then couldn't find request */
|
||||
return OMPI_ERROR;
|
||||
}
|
||||
|
||||
int gpr_replica_synchro(ompi_registry_synchro_mode_t synchro_mode,
|
||||
@ -533,19 +518,26 @@ int gpr_replica_synchro(ompi_registry_synchro_mode_t synchro_mode,
|
||||
ompi_registry_notify_cb_fn_t cb_func, void *user_tag)
|
||||
{
|
||||
int rc;
|
||||
mca_gpr_notify_id_t local_idtag;
|
||||
|
||||
OMPI_THREAD_LOCK(&mca_gpr_replica_mutex);
|
||||
rc = gpr_replica_synchro_nl(synchro_mode,addr_mode,segment,tokens,trigger,cb_func,user_tag);
|
||||
|
||||
/* enter request on notify tracking system */
|
||||
local_idtag = gpr_replica_enter_notify_request(NULL, 0, cb_func, user_tag);
|
||||
|
||||
/* process synchro request */
|
||||
rc = gpr_replica_synchro_nl(synchro_mode, addr_mode,
|
||||
segment, tokens, trigger, local_idtag);
|
||||
|
||||
OMPI_THREAD_UNLOCK(&mca_gpr_replica_mutex);
|
||||
return rc;
|
||||
}
|
||||
|
||||
int gpr_replica_synchro_nl(ompi_registry_synchro_mode_t synchro_mode,
|
||||
ompi_registry_mode_t addr_mode,
|
||||
char *segment, char **tokens, int trigger,
|
||||
ompi_registry_notify_cb_fn_t cb_func, void *user_tag)
|
||||
ompi_registry_mode_t addr_mode,
|
||||
char *segment, char **tokens, int trigger,
|
||||
mca_gpr_notify_id_t id_tag)
|
||||
{
|
||||
mca_gpr_notify_request_tracker_t *trackptr;
|
||||
mca_gpr_idtag_list_t *ptr_free_id;
|
||||
mca_gpr_replica_trigger_list_t *trig;
|
||||
ompi_registry_notify_message_t *notify_msg;
|
||||
|
||||
@ -560,25 +552,10 @@ int gpr_replica_synchro_nl(ompi_registry_synchro_mode_t synchro_mode,
|
||||
return OMPI_ERROR;
|
||||
}
|
||||
|
||||
/* enter request on notify tracking system */
|
||||
trackptr = OBJ_NEW(mca_gpr_notify_request_tracker_t);
|
||||
trackptr->requestor = NULL;
|
||||
trackptr->req_tag = 0;
|
||||
trackptr->callback = cb_func;
|
||||
trackptr->user_tag = user_tag;
|
||||
if (ompi_list_is_empty(&mca_gpr_replica_free_notify_id_tags)) {
|
||||
trackptr->id_tag = mca_gpr_replica_last_notify_id_tag;
|
||||
mca_gpr_replica_last_notify_id_tag++;
|
||||
} else {
|
||||
ptr_free_id = (mca_gpr_idtag_list_t*)ompi_list_remove_first(&mca_gpr_replica_free_notify_id_tags);
|
||||
trackptr->id_tag = ptr_free_id->id_tag;
|
||||
}
|
||||
|
||||
/* construct the trigger - add to notify tracking system if success, otherwise dump */
|
||||
/* construct the trigger */
|
||||
if (NULL != (trig = gpr_replica_construct_trigger(synchro_mode, OMPI_REGISTRY_NOTIFY_NONE,
|
||||
addr_mode, segment, tokens,
|
||||
trigger, trackptr->id_tag))) {
|
||||
ompi_list_append(&mca_gpr_replica_notify_request_tracker, &trackptr->item);
|
||||
trigger, id_tag))) {
|
||||
|
||||
/* if synchro condition already met, construct and send message */
|
||||
if ((OMPI_REGISTRY_SYNCHRO_MODE_GT_EQUAL & synchro_mode && trig->count >= trigger) ||
|
||||
@ -591,7 +568,6 @@ int gpr_replica_synchro_nl(ompi_registry_synchro_mode_t synchro_mode,
|
||||
}
|
||||
return OMPI_SUCCESS;
|
||||
} else {
|
||||
OBJ_RELEASE(trackptr);
|
||||
return OMPI_ERROR;
|
||||
}
|
||||
}
|
||||
@ -604,15 +580,19 @@ int gpr_replica_cancel_synchro(ompi_registry_synchro_mode_t synchro_mode,
|
||||
OMPI_THREAD_LOCK(&mca_gpr_replica_mutex);
|
||||
rc = gpr_replica_cancel_synchro_nl(synchro_mode,addr_mode,segment,tokens,trigger);
|
||||
OMPI_THREAD_UNLOCK(&mca_gpr_replica_mutex);
|
||||
return rc;
|
||||
|
||||
if (MCA_GPR_NOTIFY_ID_MAX == rc) {
|
||||
return OMPI_ERROR;
|
||||
}
|
||||
|
||||
return OMPI_SUCCESS;
|
||||
}
|
||||
|
||||
int gpr_replica_cancel_synchro_nl(ompi_registry_synchro_mode_t synchro_mode,
|
||||
ompi_registry_mode_t addr_mode,
|
||||
char *segment, char **tokens, int trigger)
|
||||
mca_gpr_notify_id_t gpr_replica_cancel_synchro_nl(ompi_registry_synchro_mode_t synchro_mode,
|
||||
ompi_registry_mode_t addr_mode,
|
||||
char *segment, char **tokens, int trigger)
|
||||
{
|
||||
mca_gpr_notify_request_tracker_t *trackptr;
|
||||
mca_gpr_notify_id_t id_tag;
|
||||
mca_gpr_notify_id_t id_tag, req_idtag;
|
||||
|
||||
if (mca_gpr_replica_debug) {
|
||||
ompi_output(0, "[%d,%d,%d] gpr replica: cancel_synchro entered: segment %s 1st token %s",
|
||||
@ -625,30 +605,140 @@ int gpr_replica_cancel_synchro_nl(ompi_registry_synchro_mode_t synchro_mode,
|
||||
return OMPI_ERROR;
|
||||
}
|
||||
|
||||
/* find trigger on replica - return id_tag */
|
||||
if (MCA_GPR_NOTIFY_ID_MAX == (id_tag = gpr_replica_remove_trigger(synchro_mode, OMPI_REGISTRY_NOTIFY_NONE,
|
||||
addr_mode, segment, tokens, trigger))) {
|
||||
return OMPI_ERROR;
|
||||
/* find trigger on replica - return local id_tag */
|
||||
id_tag = gpr_replica_remove_trigger(synchro_mode, OMPI_REGISTRY_NOTIFY_NONE,
|
||||
addr_mode, segment, tokens, trigger);
|
||||
|
||||
if (MCA_GPR_NOTIFY_ID_MAX != id_tag) { /* removed trigger successfully */
|
||||
/* remove notify request - return requestor id_tag */
|
||||
req_idtag = gpr_replica_remove_notify_request(id_tag);
|
||||
return req_idtag;
|
||||
} else {
|
||||
return MCA_GPR_NOTIFY_ID_MAX;
|
||||
}
|
||||
|
||||
/* find request on notify tracking system */
|
||||
for (trackptr = (mca_gpr_notify_request_tracker_t*)ompi_list_get_first(&mca_gpr_replica_notify_request_tracker);
|
||||
trackptr != (mca_gpr_notify_request_tracker_t*)ompi_list_get_end(&mca_gpr_replica_notify_request_tracker) &&
|
||||
trackptr->id_tag != id_tag;
|
||||
trackptr = (mca_gpr_notify_request_tracker_t*)ompi_list_get_next(trackptr));
|
||||
|
||||
/* ...and remove it */
|
||||
if (trackptr != (mca_gpr_notify_request_tracker_t*)ompi_list_get_end(&mca_gpr_replica_notify_request_tracker)) {
|
||||
ompi_list_remove_item(&mca_gpr_replica_notify_request_tracker, &trackptr->item);
|
||||
OBJ_RELEASE(trackptr);
|
||||
|
||||
return OMPI_SUCCESS;
|
||||
}
|
||||
|
||||
/* if we get here, then couldn't find request */
|
||||
return OMPI_ERROR;
|
||||
}
|
||||
|
||||
|
||||
int gpr_replica_rte_register(char *contact_info, size_t num_procs,
|
||||
ompi_registry_notify_cb_fn_t start_cb_func, void *start_user_tag,
|
||||
ompi_registry_notify_cb_fn_t end_cb_func, void *end_user_tag)
|
||||
{
|
||||
int ret;
|
||||
mca_gpr_notify_id_t local_idtag1, local_idtag2;
|
||||
ompi_buffer_t buffer;
|
||||
|
||||
/* create the buffer to store the local information */
|
||||
ompi_buffer_init(&buffer, 0);
|
||||
ompi_pack_string(buffer, contact_info);
|
||||
ompi_pack(buffer, &ompi_process_info.pid, 1, OMPI_INT32);
|
||||
ompi_pack_string(buffer, ompi_system_info.nodename);
|
||||
|
||||
OMPI_THREAD_LOCK(&mca_gpr_replica_mutex);
|
||||
|
||||
local_idtag1 = gpr_replica_enter_notify_request(NULL, 0, start_cb_func, start_user_tag);
|
||||
|
||||
local_idtag2 = gpr_replica_enter_notify_request(NULL, 0, end_cb_func, end_user_tag);
|
||||
|
||||
ret = gpr_replica_rte_register_nl(contact_info, buffer, num_procs,
|
||||
local_idtag1, local_idtag2);
|
||||
|
||||
OMPI_THREAD_UNLOCK(&mca_gpr_replica_mutex);
|
||||
return ret;
|
||||
}
|
||||
|
||||
int gpr_replica_rte_register_nl(char *contact_info, ompi_buffer_t buffer, size_t num_procs,
|
||||
mca_gpr_notify_id_t start_idtag,
|
||||
mca_gpr_notify_id_t end_idtag)
|
||||
{
|
||||
char *segment;
|
||||
char *keys[2];
|
||||
void *addr;
|
||||
int rc, size;
|
||||
ompi_process_name_t proc={0,0,0};
|
||||
|
||||
|
||||
/* extract process name from contact info */
|
||||
mca_oob_parse_contact_info(contact_info, &proc, NULL);
|
||||
|
||||
/* setup keys and segment for this job */
|
||||
asprintf(&segment, "ompi-job-%s", ns_base_get_jobid_string(&proc));
|
||||
keys[0] = ns_base_get_proc_name_string(&proc);
|
||||
keys[1] = NULL;
|
||||
|
||||
if (mca_gpr_replica_debug) {
|
||||
ompi_output(0, "gpr_replica_register: entered for proc %s", keys[0]);
|
||||
}
|
||||
|
||||
/* peek the buffer and resulting size */
|
||||
ompi_buffer_get(buffer, &addr, &size);
|
||||
|
||||
/* place on registry, no overwrite - error if already there */
|
||||
rc = gpr_replica_put_nl(OMPI_REGISTRY_XAND,
|
||||
segment, keys, addr, size);
|
||||
|
||||
if (OMPI_SUCCESS != rc) {
|
||||
if (mca_gpr_replica_debug) {
|
||||
ompi_output(0, "gpr_replica_register: duplicate registration attempt from %s", keys[0]);
|
||||
}
|
||||
return rc;
|
||||
}
|
||||
|
||||
/* register a synchro on the segment so we get notified when everyone registers */
|
||||
rc = gpr_replica_synchro_nl(
|
||||
OMPI_REGISTRY_SYNCHRO_MODE_LEVEL|OMPI_REGISTRY_SYNCHRO_MODE_ONE_SHOT,
|
||||
OMPI_REGISTRY_OR,
|
||||
segment,
|
||||
NULL,
|
||||
num_procs,
|
||||
start_idtag);
|
||||
|
||||
/* register a synchro on the segment so we get notified when everyone is gone */
|
||||
rc = gpr_replica_synchro_nl(
|
||||
OMPI_REGISTRY_SYNCHRO_MODE_DESCENDING|OMPI_REGISTRY_SYNCHRO_MODE_ONE_SHOT,
|
||||
OMPI_REGISTRY_OR,
|
||||
segment,
|
||||
NULL,
|
||||
0,
|
||||
end_idtag);
|
||||
|
||||
return rc;
|
||||
}
|
||||
|
||||
int gpr_replica_rte_unregister(char *proc_name_string)
|
||||
{
|
||||
int ret;
|
||||
|
||||
OMPI_THREAD_LOCK(&mca_gpr_replica_mutex);
|
||||
ret = gpr_replica_rte_unregister_nl(proc_name_string);
|
||||
OMPI_THREAD_UNLOCK(&mca_gpr_replica_mutex);
|
||||
|
||||
return ret;
|
||||
}
|
||||
|
||||
|
||||
int gpr_replica_rte_unregister_nl(char *proc_name_string)
|
||||
{
|
||||
char *segment;
|
||||
char *keys[2];
|
||||
int rc;
|
||||
ompi_process_name_t *proc;
|
||||
|
||||
/* convert string to process name */
|
||||
proc = ns_base_convert_string_to_process_name(proc_name_string);
|
||||
|
||||
/* setup keys and segment for this job */
|
||||
asprintf(&segment, "ompi-job-%s", ns_base_get_jobid_string(proc));
|
||||
keys[0] = strdup(proc_name_string);
|
||||
keys[1] = NULL;
|
||||
|
||||
rc = gpr_replica_delete_object_nl(OMPI_REGISTRY_XAND, segment, keys);
|
||||
free(keys[0]);
|
||||
free(proc);
|
||||
return rc;
|
||||
|
||||
}
|
||||
|
||||
|
||||
ompi_list_t* gpr_replica_get(ompi_registry_mode_t addr_mode,
|
||||
char *segment, char **tokens)
|
||||
{
|
||||
|
Загрузка…
x
Ссылка в новой задаче
Block a user