1
1

corrections for threading support

This commit was SVN r9292.
Этот коммит содержится в:
Tim Woodall 2006-03-16 00:06:48 +00:00
родитель c34f4c2cb7
Коммит 564c177922

Просмотреть файл

@ -73,16 +73,26 @@
*/
extern char **environ;
#if OMPI_HAVE_POSIX_THREADS && OMPI_THREADS_HAVE_DIFFERENT_PIDS
int orte_pls_bproc_launch_threaded(orte_jobid_t);
#endif
/**
* Initialization of the bproc module with all the needed function pointers
*/
orte_pls_base_module_t orte_pls_bproc_module = {
#if OMPI_HAVE_POSIX_THREADS && OMPI_THREADS_HAVE_DIFFERENT_PIDS
orte_pls_bproc_launch_threaded,
#else
orte_pls_bproc_launch,
#endif
orte_pls_bproc_terminate_job,
orte_pls_bproc_terminate_proc,
orte_pls_bproc_finalize
};
static int orte_pls_bproc_node_array(orte_rmaps_base_map_t* map,
int ** node_array, int * node_array_len);
static int orte_pls_bproc_node_list(int * node_array, int node_array_len,
@ -105,7 +115,7 @@ static int orte_pls_bproc_launch_daemons(orte_cellid_t cellid, char *** envp,
int ** node_arrays, int * node_array_lens,
int num_contexts, int num_procs,
orte_vpid_t global_vpid_start,
orte_jobid_t jobid);
orte_jobid_t jobid, int* num_daemons);
static int orte_pls_bproc_launch_app(orte_cellid_t cellid, orte_jobid_t jobid,
orte_rmaps_base_map_t* map, int num_processes,
orte_vpid_t vpid_start,
@ -469,7 +479,7 @@ static int orte_pls_bproc_launch_daemons(orte_cellid_t cellid, char *** envp,
int ** node_arrays, int * node_array_lens,
int num_contexts, int num_procs,
orte_vpid_t global_vpid_start,
orte_jobid_t jobid) {
orte_jobid_t jobid, int *num_launched) {
int * daemon_list = NULL;
int num_nodes = 0;
int num_daemons = 0;
@ -635,6 +645,7 @@ static int orte_pls_bproc_launch_daemons(orte_cellid_t cellid, char *** envp,
}
}
}
*num_launched = num_daemons;
cleanup:
if(NULL != argv) {
@ -802,8 +813,10 @@ int orte_pls_bproc_launch(orte_jobid_t jobid) {
int ** node_array = NULL;
int * node_array_len = NULL;
int num_processes = 0;
int num_daemons = 0;
int context = 0;
size_t idx, j;
int j;
size_t idx;
char cwd_save[OMPI_PATH_MAX + 1];
if (NULL == getcwd(cwd_save, sizeof(cwd_save))) {
@ -832,7 +845,6 @@ int orte_pls_bproc_launch(orte_jobid_t jobid) {
/* do a large lock so the processes will not decrement the process count
* until we are done launching */
OPAL_THREAD_LOCK(&mca_pls_bproc_component.lock);
for (item = opal_list_get_first(&mapping);
item != opal_list_get_end(&mapping);
@ -888,7 +900,7 @@ int orte_pls_bproc_launch(orte_jobid_t jobid) {
/* launch the daemons on all the nodes which have processes assign to them */
rc = orte_pls_bproc_launch_daemons(cellid, &map->app->env, node_array,
node_array_len, context, num_processes,
vpid_start, jobid);
vpid_start, jobid, &num_daemons);
if(ORTE_SUCCESS != rc) {
ORTE_ERROR_LOG(rc);
goto cleanup;
@ -897,7 +909,7 @@ int orte_pls_bproc_launch(orte_jobid_t jobid) {
/* wait for communication back from the daemons, which indicates they have
* sucessfully set up the pty/pipes and IO forwarding which the user apps
* will use */
for(j = 0; j < mca_pls_bproc_component.num_daemons; j++) {
for(j = 0; j < num_daemons; j++) {
orte_buffer_t ack;
OBJ_CONSTRUCT(&ack, orte_buffer_t);
rc = mca_oob_recv_packed(MCA_OOB_NAME_ANY, &ack, MCA_OOB_TAG_BPROC);
@ -958,7 +970,6 @@ int orte_pls_bproc_launch(orte_jobid_t jobid) {
mca_pls_bproc_component.done_launching = true;
cleanup:
chdir(cwd_save);
OPAL_THREAD_UNLOCK(&mca_pls_bproc_component.lock);
while(NULL != (item = opal_list_remove_first(&mapping))) {
OBJ_RELEASE(item);
}
@ -1032,3 +1043,73 @@ int orte_pls_bproc_finalize(void)
return ORTE_SUCCESS;
}
/*
* Handle threading issues.
*/
#if OMPI_HAVE_POSIX_THREADS && OMPI_THREADS_HAVE_DIFFERENT_PIDS
struct orte_pls_bproc_stack_t {
opal_condition_t cond;
opal_mutex_t mutex;
bool complete;
orte_jobid_t jobid;
int rc;
};
typedef struct orte_pls_bproc_stack_t orte_pls_bproc_stack_t;
static void orte_pls_bproc_stack_construct(orte_pls_bproc_stack_t* stack)
{
OBJ_CONSTRUCT(&stack->mutex, opal_mutex_t);
OBJ_CONSTRUCT(&stack->cond, opal_condition_t);
stack->rc = 0;
stack->complete = false;
}
static void orte_pls_bproc_stack_destruct(orte_pls_bproc_stack_t* stack)
{
OBJ_DESTRUCT(&stack->mutex);
OBJ_DESTRUCT(&stack->cond);
}
static OBJ_CLASS_INSTANCE(
orte_pls_bproc_stack_t,
opal_object_t,
orte_pls_bproc_stack_construct,
orte_pls_bproc_stack_destruct);
static void orte_pls_bproc_launch_cb(int fd, short event, void* args)
{
orte_pls_bproc_stack_t *stack = (orte_pls_bproc_stack_t*)args;
stack->rc = orte_pls_bproc_launch(stack->jobid);
OPAL_THREAD_LOCK(&stack->mutex);
stack->complete = true;
opal_condition_signal(&stack->cond);
OPAL_THREAD_UNLOCK(&stack->mutex);
}
int orte_pls_bproc_launch_threaded(orte_jobid_t jobid)
{
struct timeval tv = { 0, 0 };
struct opal_event event;
struct orte_pls_bproc_stack_t stack;
OBJ_CONSTRUCT(&stack, orte_pls_bproc_stack_t);
stack.jobid = jobid;
opal_evtimer_set(&event, orte_pls_bproc_launch_cb, &stack);
opal_evtimer_add(&event, &tv);
OPAL_THREAD_LOCK(&stack.mutex);
while(stack.complete == false)
opal_condition_wait(&stack.cond, &stack.mutex);
OPAL_THREAD_UNLOCK(&stack.mutex);
OBJ_DESTRUCT(&stack);
return stack.rc;
}
#endif