1
1

Patch a few things that were causing trouble for programs that re-entered the registry during a callback function. Also fixed a timing problem in rte_monitor - ensured that we were in fact already waiting on a condition before generating a wakeup signal. Adjusted the timing of mpirun to ensure that the synchro to alert mpirun of all-processes-done got registered before they completed.

This commit was SVN r2885.
Этот коммит содержится в:
Ralph Castain 2004-09-29 21:54:57 +00:00
родитель d5f4ebde71
Коммит b42a361302
7 изменённых файлов: 124 добавлений и 50 удалений

Просмотреть файл

@ -38,6 +38,7 @@ int gpr_replica_delete_segment(char *segment)
OMPI_THREAD_LOCK(&mca_gpr_replica_mutex);
rc = gpr_replica_delete_segment_nl(segment);
OMPI_THREAD_UNLOCK(&mca_gpr_replica_mutex);
gpr_replica_process_callbacks();
return rc;
}
@ -73,6 +74,7 @@ int gpr_replica_put(ompi_registry_mode_t addr_mode, char *segment,
OMPI_THREAD_LOCK(&mca_gpr_replica_mutex);
rc = gpr_replica_put_nl(addr_mode, segment, tokens, object, size);
OMPI_THREAD_UNLOCK(&mca_gpr_replica_mutex);
gpr_replica_process_callbacks();
return rc;
}
@ -230,6 +232,7 @@ int gpr_replica_delete_object(ompi_registry_mode_t addr_mode,
OMPI_THREAD_LOCK(&mca_gpr_replica_mutex);
rc = gpr_replica_delete_object_nl(addr_mode, segment, tokens);
OMPI_THREAD_UNLOCK(&mca_gpr_replica_mutex);
gpr_replica_process_callbacks();
return rc;
}
@ -422,6 +425,8 @@ int gpr_replica_subscribe(ompi_registry_mode_t addr_mode,
rc = gpr_replica_subscribe_nl(addr_mode,action,segment,tokens,local_idtag);
OMPI_THREAD_UNLOCK(&mca_gpr_replica_mutex);
gpr_replica_process_callbacks();
return rc;
}
@ -530,6 +535,8 @@ int gpr_replica_synchro(ompi_registry_synchro_mode_t synchro_mode,
segment, tokens, trigger, local_idtag);
OMPI_THREAD_UNLOCK(&mca_gpr_replica_mutex);
gpr_replica_process_callbacks();
return rc;
}
@ -643,6 +650,8 @@ int gpr_replica_rte_register(char *contact_info, size_t num_procs,
local_idtag1, local_idtag2);
OMPI_THREAD_UNLOCK(&mca_gpr_replica_mutex);
gpr_replica_process_callbacks();
return ret;
}
@ -712,6 +721,7 @@ int gpr_replica_rte_unregister(char *proc_name_string)
ret = gpr_replica_rte_unregister_nl(proc_name_string);
OMPI_THREAD_UNLOCK(&mca_gpr_replica_mutex);
gpr_replica_process_callbacks();
return ret;
}
@ -745,7 +755,7 @@ ompi_list_t* gpr_replica_get(ompi_registry_mode_t addr_mode,
ompi_list_t* list;
OMPI_THREAD_LOCK(&mca_gpr_replica_mutex);
list = gpr_replica_get_nl(addr_mode, segment, tokens);
OMPI_THREAD_UNLOCK(&mca_gpr_replica_mutex);
OMPI_THREAD_UNLOCK(&mca_gpr_replica_mutex);
return list;
}

Просмотреть файл

@ -39,6 +39,21 @@ typedef struct mca_gpr_replica_t mca_gpr_replica_t;
OBJ_CLASS_DECLARATION(mca_gpr_replica_t);
/*
* Callback list "head"
*/
struct mca_gpr_replica_callbacks_t {
ompi_list_item_t item;
ompi_registry_notify_cb_fn_t cb_func;
ompi_registry_notify_message_t *message;
ompi_process_name_t *requestor;
int remote_idtag;
void *user_tag;
};
typedef struct mca_gpr_replica_callbacks_t mca_gpr_replica_callbacks_t;
OBJ_CLASS_DECLARATION(mca_gpr_replica_callbacks_t);
/** Dictionary of token-key pairs.
* This structure is used to create a linked list of token-key pairs. All calls to
* registry functions pass character string tokens for programming clarity - the ompi_keytable
@ -177,6 +192,7 @@ OBJ_CLASS_DECLARATION(mca_gpr_replica_segment_t);
*/
extern mca_gpr_replica_t mca_gpr_replica_head;
extern ompi_list_t mca_gpr_replica_notify_request_tracker;
extern ompi_list_t mca_gpr_replica_callbacks;
extern mca_gpr_notify_id_t mca_gpr_replica_last_notify_id_tag;
extern ompi_list_t mca_gpr_replica_free_notify_id_tags;
extern int mca_gpr_replica_debug;

Просмотреть файл

@ -81,6 +81,7 @@ static bool initialized = false;
*/
mca_gpr_replica_t mca_gpr_replica_head;
ompi_list_t mca_gpr_replica_notify_request_tracker;
ompi_list_t mca_gpr_replica_callbacks;
mca_gpr_notify_id_t mca_gpr_replica_last_notify_id_tag;
ompi_list_t mca_gpr_replica_free_notify_id_tags;
int mca_gpr_replica_debug;
@ -126,6 +127,34 @@ OBJ_CLASS_INSTANCE(
mca_gpr_replica_keylist_destructor); /* destructor */
/* constructor - used to initialize state of callback list instance */
static void mca_gpr_replica_callbacks_construct(mca_gpr_replica_callbacks_t* cb)
{
cb->cb_func = NULL;
cb->message = NULL;
cb->requestor = NULL;
cb->remote_idtag = 0;
cb->user_tag = NULL;
}
/* destructor - used to free any resources held by instance */
static void mca_gpr_replica_callbacks_destructor(mca_gpr_replica_callbacks_t* cb)
{
if (NULL != cb->requestor) {
free(cb->requestor);
}
}
/* define instance of ompi_class_t */
OBJ_CLASS_INSTANCE(
mca_gpr_replica_callbacks_t, /* type name */
ompi_list_item_t, /* parent "class" name */
mca_gpr_replica_callbacks_construct, /* constructor */
mca_gpr_replica_callbacks_destructor); /* destructor */
/* constructor - used to initialize state of trigger list instance */
static void mca_gpr_replica_trigger_list_construct(mca_gpr_replica_trigger_list_t* trig)
{
@ -365,6 +394,9 @@ mca_gpr_base_module_t *mca_gpr_replica_init(bool *allow_multi_user_threads, bool
ompi_output(0, "req tracker setup");
}
/* initialize the callback list head */
OBJ_CONSTRUCT(&mca_gpr_replica_callbacks, ompi_list_t);
/* issue the non-blocking receive */
rc = mca_oob_recv_packed_nb(MCA_OOB_NAME_ANY, MCA_OOB_TAG_GPR, 0, mca_gpr_replica_recv, NULL);
if(rc != OMPI_SUCCESS && rc != OMPI_ERR_NOT_IMPLEMENTED) {
@ -773,6 +805,8 @@ void mca_gpr_replica_recv(int status, ompi_process_name_t* sender,
/* RHC -- not sure what to do if the return send fails */
}
/* process any resulting callbacks */
gpr_replica_process_callbacks();
/***** UNSUBSCRIBE *****/
} else if (MCA_GPR_UNSUBSCRIBE_CMD == command) {
@ -907,6 +941,10 @@ void mca_gpr_replica_recv(int status, ompi_process_name_t* sender,
/* RHC -- not sure what to do if the return send fails */
}
/* process any resulting callbacks */
gpr_replica_process_callbacks();
/***** CANCEL SYNCHRO *****/
} else if (MCA_GPR_CANCEL_SYNCHRO_CMD == command) {
@ -1042,6 +1080,9 @@ void mca_gpr_replica_recv(int status, ompi_process_name_t* sender,
/* RHC -- not sure what to do if the return send fails */
}
/* process any resulting callbacks */
gpr_replica_process_callbacks();
/***** UNREGISTER *****/
@ -1072,6 +1113,10 @@ void mca_gpr_replica_recv(int status, ompi_process_name_t* sender,
/* RHC -- not sure what to do if the return send fails */
}
/* process any resulting callbacks */
gpr_replica_process_callbacks();
/***** TEST INTERNALS *****/
} else if (MCA_GPR_TEST_INTERNALS_CMD == command) {

Просмотреть файл

@ -693,10 +693,8 @@ bool gpr_replica_process_triggers(char *segment,
{
mca_gpr_replica_segment_t *seg;
mca_gpr_notify_request_tracker_t *trackptr;
ompi_registry_object_t *data;
char **tokptr;
int i;
bool found;
mca_gpr_replica_callbacks_t *cb;
if (mca_gpr_replica_debug) {
ompi_output(0, "[%d,%d,%d] gpr replica: process_trig entered", ompi_process_info.name->cellid,
@ -738,40 +736,18 @@ bool gpr_replica_process_triggers(char *segment,
}
/* process request */
if (NULL == trackptr->requestor) { /* local request - callback fn with their tag */
if (mca_gpr_replica_debug) {
ompi_output(0, "[%d,%d,%d] gpr replica-process_trig: local callback", ompi_process_info.name->cellid,
ompi_process_info.name->jobid, ompi_process_info.name->vpid);
}
trackptr->callback(message, trackptr->user_tag);
/* dismantle message and free memory */
while (NULL != (data = (ompi_registry_object_t*)ompi_list_remove_first(&message->data))) {
OBJ_RELEASE(data);
}
for (i=0, tokptr=message->tokens; i < message->num_tokens; i++, tokptr++) {
free(*tokptr);
*tokptr = NULL;
}
if (NULL != message->tokens) {
free(message->tokens);
message->tokens = NULL;
}
free(message);
message = NULL;
if (mca_gpr_replica_debug) {
ompi_output(0, "[%d,%d,%d] gpr replica-process_trig: data released", ompi_process_info.name->cellid,
ompi_process_info.name->jobid, ompi_process_info.name->vpid);
}
} else { /* remote request - send message back */
gpr_replica_remote_notify(trackptr->requestor, trackptr->req_tag, message);
if (mca_gpr_replica_debug) {
ompi_output(0, "[%d,%d,%d] gpr replica-process_trig: remote message sent", ompi_process_info.name->cellid,
ompi_process_info.name->jobid, ompi_process_info.name->vpid);
}
cb = OBJ_NEW(mca_gpr_replica_callbacks_t);
if (NULL == trackptr->requestor) { /* local request - queue callback fn with their tag */
cb->cb_func = trackptr->callback;
cb->message = message;
cb->user_tag = trackptr->user_tag;
} else { /* remote request - queue remote callback */
cb->requestor = ompi_name_server.copy_process_name(trackptr->requestor);
cb->remote_idtag = trackptr->req_tag;
cb->message = message;
}
ompi_list_append(&mca_gpr_replica_callbacks, &cb->item);
/* if one-shot, remove request from tracking system */
if (OMPI_REGISTRY_SYNCHRO_MODE_ONE_SHOT & trig->synch_mode) {
@ -792,6 +768,29 @@ bool gpr_replica_process_triggers(char *segment,
}
void gpr_replica_process_callbacks(void)
{
mca_gpr_replica_callbacks_t *cb;
while (NULL != (cb = (mca_gpr_replica_callbacks_t*)ompi_list_remove_first(&mca_gpr_replica_callbacks))) {
if (NULL == cb->requestor) { /* local callback */
if (mca_gpr_replica_debug) {
ompi_output(0, "process_callbacks: local");
}
cb->cb_func(cb->message, cb->user_tag);
} else { /* remote request - send message back */
if (mca_gpr_replica_debug) {
ompi_output(0, "process_callbacks: remote to [%d,%d,%d]", cb->requestor->cellid,
cb->requestor->jobid, cb->requestor->vpid);
}
gpr_replica_remote_notify(cb->requestor, cb->remote_idtag, cb->message);
}
OBJ_RELEASE(cb);
}
}
mca_gpr_notify_id_t gpr_replica_enter_notify_request(ompi_process_name_t *requestor,
mca_gpr_notify_id_t idtag,
ompi_registry_notify_cb_fn_t cb_func,

Просмотреть файл

@ -103,3 +103,4 @@ mca_gpr_notify_id_t gpr_replica_enter_notify_request(ompi_process_name_t *reques
mca_gpr_notify_id_t gpr_replica_remove_notify_request(mca_gpr_notify_id_t idtag);
void gpr_replica_process_callbacks(void);

Просмотреть файл

@ -39,7 +39,9 @@ void ompi_rte_all_procs_registered(ompi_registry_notify_message_t* match, void*
{
OMPI_THREAD_LOCK(&ompi_rte_mutex);
ompi_rte_job_started = true;
ompi_condition_signal(&ompi_rte_condition);
if (ompi_rte_waiting) {
ompi_condition_signal(&ompi_rte_condition);
}
OMPI_THREAD_UNLOCK(&ompi_rte_mutex);
}
@ -71,12 +73,14 @@ int ompi_rte_monitor_procs_registered(void)
/* block until a timeout occurs or all processes have registered */
gettimeofday(&tv, NULL);
ts.tv_sec = tv.tv_sec + 30;
ts.tv_sec = tv.tv_sec + 1000000;
ts.tv_nsec = 0;
OMPI_THREAD_LOCK(&ompi_rte_mutex);
if(ompi_rte_job_started == false) {
ompi_rte_waiting = true;
ompi_condition_timedwait(&ompi_rte_condition, &ompi_rte_mutex, &ts);
ompi_rte_waiting = false;
if(ompi_rte_job_started == false) {
ompi_mutex_unlock(&ompi_rte_mutex);
return OMPI_ERROR;
@ -93,6 +97,7 @@ int ompi_rte_monitor_procs_unregistered(void)
while(ompi_rte_job_finished == false) {
ompi_rte_waiting = true;
ompi_condition_wait(&ompi_rte_condition, &ompi_rte_mutex);
ompi_rte_waiting = false;
}
OMPI_THREAD_UNLOCK(&ompi_rte_mutex);

Просмотреть файл

@ -355,6 +355,16 @@ main(int argc, char *argv[])
NULL,
num_procs,
ompi_rte_all_procs_registered, NULL);
/* register a synchro on the segment so we get notified when everyone is gone
*/
rc = ompi_registry.synchro(
OMPI_REGISTRY_SYNCHRO_MODE_DESCENDING|OMPI_REGISTRY_SYNCHRO_MODE_ONE_SHOT,
OMPI_REGISTRY_OR,
segment,
NULL,
0,
ompi_rte_all_procs_unregistered, NULL);
/*
@ -369,18 +379,6 @@ main(int argc, char *argv[])
if (OMPI_SUCCESS != ompi_rte_monitor_procs_registered()) {
printf("procs didn't all register - aborting\n");
} else {
/* register a synchro on the segment so we get notified when everyone is gone
* this needs to occur after spawning processes so that mpirun gets notified
* last!
*/
rc = ompi_registry.synchro(
OMPI_REGISTRY_SYNCHRO_MODE_DESCENDING|OMPI_REGISTRY_SYNCHRO_MODE_ONE_SHOT,
OMPI_REGISTRY_OR,
segment,
NULL,
0,
ompi_rte_all_procs_unregistered, NULL);
ompi_rte_monitor_procs_unregistered();
}
/*