/* * Copyright (c) 2004-2008 The Trustees of Indiana University. * All rights reserved. * Copyright (c) 2004-2005 The Trustees of the University of Tennessee. * All rights reserved. * Copyright (c) 2004-2005 High Performance Computing Center Stuttgart, * University of Stuttgart. All rights reserved. * Copyright (c) 2004-2005 The Regents of the University of California. * All rights reserved. * $COPYRIGHT$ * * Additional copyrights may follow * * $HEADER$ */ /* * */ #include "orte_config.h" #include "orte/constants.h" #include #include #include #include #ifdef HAVE_UNISTD_H #include #endif /* HAVE_UNISTD_H */ #include "opal/mca/mca.h" #include "opal/mca/base/base.h" #include "opal/event/event.h" #include "orte/util/show_help.h" #include "opal/mca/base/mca_base_param.h" #include "orte/util/show_help.h" #include "opal/util/argv.h" #include "opal/util/opal_environ.h" #include "opal/threads/mutex.h" #include "opal/threads/threads.h" #include "opal/threads/condition.h" #include "orte/util/name_fns.h" #include "orte/util/proc_info.h" #include "orte/runtime/orte_globals.h" #include "orte/runtime/orte_wait.h" #include "orte/mca/filem/filem.h" #include "orte/mca/filem/base/base.h" #include "filem_rsh.h" /********** * Local Function and Variable Declarations **********/ static int orte_filem_rsh_start_copy(orte_filem_base_request_t *request); static int orte_filem_rsh_start_rm(orte_filem_base_request_t *request); static int orte_filem_rsh_start_command(orte_filem_base_process_set_t *proc_set, orte_filem_base_file_set_t *file_set, char * command, orte_filem_base_request_t *request, int index); static int start_child(char * command, orte_filem_base_request_t *request, int index); static int orte_filem_rsh_query_remote_path(char **remote_ref, orte_process_name_t *proc, int *flag); static void filem_rsh_waitpid_cb(pid_t pid, int status, void* cbdata); /* Permission to send functionality */ static int orte_filem_rsh_permission_listener_init(orte_rml_buffer_callback_fn_t rml_cbfunc); static int orte_filem_rsh_permission_listener_cancel(void); static void orte_filem_rsh_permission_callback(int status, orte_process_name_t* sender, opal_buffer_t *buffer, orte_rml_tag_t tag, void* cbdata); static int orte_filem_rsh_permission_ask(orte_process_name_t* sender, int num_sends); static int permission_send_done(orte_process_name_t* sender, int num_avail); static int permission_send_num_allowed(orte_process_name_t* sender, int num_allowed); /************* * Local work pool structure *************/ int cur_num_incomming = 0; int cur_num_outgoing = 0; static bool work_pool_all_done = false; static opal_mutex_t work_pool_lock; static opal_condition_t work_pool_cond; /* * work_pool_waiting: * - processes that are waiting for permission to put() to me */ opal_list_t work_pool_waiting; /* * work_pool_pending: * - put requests waiting on permission to send to peer */ opal_list_t work_pool_pending; /* * work_pool_active: * - put requests currently sending */ opal_list_t work_pool_active; struct orte_filem_rsh_work_pool_item_t { /** This is an object, so must have a super */ opal_list_item_t super; /** Command to exec */ char * command; /** Pointer to FileM Request */ orte_filem_base_request_t *request; /** Index into the request */ int index; /** Process Set */ orte_filem_base_process_set_t proc_set; /** File Set */ orte_filem_base_file_set_t file_set; /** If this item is active */ bool active; }; typedef struct orte_filem_rsh_work_pool_item_t orte_filem_rsh_work_pool_item_t; OBJ_CLASS_DECLARATION(orte_filem_rsh_work_pool_item_t); void orte_filem_rsh_work_pool_construct(orte_filem_rsh_work_pool_item_t *obj); void orte_filem_rsh_work_pool_destruct( orte_filem_rsh_work_pool_item_t *obj); OBJ_CLASS_INSTANCE(orte_filem_rsh_work_pool_item_t, opal_list_item_t, orte_filem_rsh_work_pool_construct, orte_filem_rsh_work_pool_destruct); void orte_filem_rsh_work_pool_construct(orte_filem_rsh_work_pool_item_t *obj) { obj->command = NULL; obj->request = NULL; obj->index = 0; OBJ_CONSTRUCT(&(obj->proc_set), orte_filem_base_process_set_t); OBJ_CONSTRUCT(&(obj->file_set), orte_filem_base_file_set_t); obj->active = false; } void orte_filem_rsh_work_pool_destruct( orte_filem_rsh_work_pool_item_t *obj) { if( NULL != obj->command ) { free(obj->command); obj->command = NULL; } if( NULL != obj->request ) { OBJ_RELEASE(obj->request); obj->request = NULL; } obj->index = 0; OBJ_DESTRUCT(&(obj->proc_set)); OBJ_DESTRUCT(&(obj->file_set)); obj->active = false; } /* * Rsh module */ static orte_filem_base_module_t loc_module = { /** Initialization Function */ orte_filem_rsh_module_init, /** Finalization Function */ orte_filem_rsh_module_finalize, orte_filem_rsh_put, orte_filem_rsh_put_nb, orte_filem_rsh_get, orte_filem_rsh_get_nb, orte_filem_rsh_rm, orte_filem_rsh_rm_nb, orte_filem_rsh_wait, orte_filem_rsh_wait_all }; /* * MCA Functions */ int orte_filem_rsh_component_query(mca_base_module_t **module, int *priority) { OPAL_OUTPUT_VERBOSE((10, mca_filem_rsh_component.super.output_handle, "filem:rsh: component_query()")); *priority = mca_filem_rsh_component.super.priority; *module = (mca_base_module_t *)&loc_module; return ORTE_SUCCESS; } int orte_filem_rsh_module_init(void) { int ret; OPAL_OUTPUT_VERBOSE((10, mca_filem_rsh_component.super.output_handle, "filem:rsh: module_init()")); /* * Allocate the work pools */ OBJ_CONSTRUCT(&work_pool_waiting, opal_list_t); OBJ_CONSTRUCT(&work_pool_pending, opal_list_t); OBJ_CONSTRUCT(&work_pool_active, opal_list_t); OBJ_CONSTRUCT(&work_pool_lock, opal_mutex_t); OBJ_CONSTRUCT(&work_pool_cond, opal_condition_t); work_pool_all_done = false; /* * Start the listener for permission */ if( ORTE_SUCCESS != (ret = orte_filem_rsh_permission_listener_init(orte_filem_rsh_permission_callback) ) ) { opal_output(mca_filem_rsh_component.super.output_handle, "filem:rsh:init Failed to start listener\n"); return ret; } /* start the base receive */ if (ORTE_SUCCESS != (ret = orte_filem_base_comm_start())) { opal_output(mca_filem_rsh_component.super.output_handle, "filem:rsh:init Failed to start base receive\n"); return ret; } return ORTE_SUCCESS; } int orte_filem_rsh_module_finalize(void) { opal_list_item_t *item = NULL; OPAL_OUTPUT_VERBOSE((10, mca_filem_rsh_component.super.output_handle, "filem:rsh: module_finalize()")); /* * Make sure all active requests are completed */ while(0 < opal_list_get_size(&work_pool_active) ) { ; /* JJH TODO... */ } /* * Stop the listeners */ orte_filem_rsh_permission_listener_cancel(); /* * Stop the base receive */ orte_filem_base_comm_stop(); /* * Deallocate the work pools */ while( NULL != (item = opal_list_remove_first(&work_pool_waiting)) ) { OBJ_RELEASE(item); } OBJ_DESTRUCT(&work_pool_waiting); while( NULL != (item = opal_list_remove_first(&work_pool_pending)) ) { OBJ_RELEASE(item); } OBJ_DESTRUCT(&work_pool_pending); while( NULL != (item = opal_list_remove_first(&work_pool_active)) ) { OBJ_RELEASE(item); } OBJ_DESTRUCT(&work_pool_active); OBJ_DESTRUCT(&work_pool_lock); OBJ_DESTRUCT(&work_pool_cond); return ORTE_SUCCESS; } /****************** * Local functions ******************/ int orte_filem_rsh_put(orte_filem_base_request_t *request) { int ret; if( ORTE_SUCCESS != (ret = orte_filem_base_prepare_request(request, ORTE_FILEM_MOVE_TYPE_PUT) ) ) { opal_output(mca_filem_rsh_component.super.output_handle, "filem:rsh: put(): Failed to preare the request structure (%d)", ret); return ret; } if( ORTE_SUCCESS != (ret = orte_filem_rsh_start_copy(request) ) ) { opal_output(mca_filem_rsh_component.super.output_handle, "filem:rsh: put(): Failed to post the request (%d)", ret); return ret; } if( ORTE_SUCCESS != (ret = orte_filem_rsh_wait(request)) ) { opal_output(mca_filem_rsh_component.super.output_handle, "filem:rsh: put(): Failed to wait on the request (%d)", ret); return ret; } return ORTE_SUCCESS; } int orte_filem_rsh_put_nb(orte_filem_base_request_t *request) { int ret; if( ORTE_SUCCESS != (ret = orte_filem_base_prepare_request(request, ORTE_FILEM_MOVE_TYPE_PUT) ) ) { opal_output(mca_filem_rsh_component.super.output_handle, "filem:rsh: put(): Failed to preare the request structure (%d)", ret); return ret; } if( ORTE_SUCCESS != (ret = orte_filem_rsh_start_copy(request) ) ) { opal_output(mca_filem_rsh_component.super.output_handle, "filem:rsh: put(): Failed to post the request (%d)", ret); return ret; } return ORTE_SUCCESS; } int orte_filem_rsh_get(orte_filem_base_request_t *request) { int ret; if( ORTE_SUCCESS != (ret = orte_filem_base_prepare_request(request, ORTE_FILEM_MOVE_TYPE_GET) ) ) { opal_output(mca_filem_rsh_component.super.output_handle, "filem:rsh: get(): Failed to preare the request structure (%d)", ret); return ret; } if( ORTE_SUCCESS != (ret = orte_filem_rsh_start_copy(request) ) ) { opal_output(mca_filem_rsh_component.super.output_handle, "filem:rsh: get(): Failed to post the request (%d)", ret); return ret; } if( ORTE_SUCCESS != (ret = orte_filem_rsh_wait(request)) ) { opal_output(mca_filem_rsh_component.super.output_handle, "filem:rsh: get(): Failed to wait on the request (%d)", ret); return ret; } return ORTE_SUCCESS; } int orte_filem_rsh_get_nb(orte_filem_base_request_t *request) { int ret; if( ORTE_SUCCESS != (ret = orte_filem_base_prepare_request(request, ORTE_FILEM_MOVE_TYPE_GET) ) ) { opal_output(mca_filem_rsh_component.super.output_handle, "filem:rsh: get(): Failed to preare the request structure (%d)", ret); return ret; } if( ORTE_SUCCESS != (ret = orte_filem_rsh_start_copy(request) ) ) { opal_output(mca_filem_rsh_component.super.output_handle, "filem:rsh: get(): Failed to post the request (%d)", ret); return ret; } return ORTE_SUCCESS; } int orte_filem_rsh_rm(orte_filem_base_request_t *request) { int ret = ORTE_SUCCESS, exit_status = ORTE_SUCCESS; if( ORTE_SUCCESS != (ret = orte_filem_base_prepare_request(request, ORTE_FILEM_MOVE_TYPE_RM) ) ) { opal_output(mca_filem_rsh_component.super.output_handle, "filem:rsh: rm(): Failed to prepare on the request (%d)", ret); return ret; } if( ORTE_SUCCESS != (ret = orte_filem_rsh_start_rm(request) ) ) { opal_output(mca_filem_rsh_component.super.output_handle, "filem:rsh: rm(): Failed to start the request (%d)", ret); return ret; } if( ORTE_SUCCESS != (ret = orte_filem_rsh_wait(request)) ) { opal_output(mca_filem_rsh_component.super.output_handle, "filem:rsh: rm(): Failed to wait on the request (%d)", ret); return ret; } return exit_status; } int orte_filem_rsh_rm_nb(orte_filem_base_request_t *request) { int ret = ORTE_SUCCESS, exit_status = ORTE_SUCCESS; if( ORTE_SUCCESS != (ret = orte_filem_base_prepare_request(request, ORTE_FILEM_MOVE_TYPE_RM) ) ) { opal_output(mca_filem_rsh_component.super.output_handle, "filem:rsh: rm_nb(): Failed to prepare on the request (%d)", ret); return ret; } if( ORTE_SUCCESS != (ret = orte_filem_rsh_start_rm(request) ) ) { opal_output(mca_filem_rsh_component.super.output_handle, "filem:rsh: rm_nb(): Failed to start on the request (%d)", ret); return ret; } return exit_status; } int orte_filem_rsh_wait(orte_filem_base_request_t *request) { int exit_status = ORTE_SUCCESS; orte_filem_rsh_work_pool_item_t *wp_item = NULL; opal_list_item_t *item = NULL; int i; int num_finished = 0; bool found_match = false; /* * Stage 0: A pass to see if the request is already done */ for(i = 0; i < request->num_mv; ++i) { if( request->is_done[i] == true && request->is_active[i] == true) { ++num_finished; } } while(num_finished < request->num_mv ) { /* * Stage 1: Complete all active requests */ for(i = 0; i < request->num_mv; ++i) { /* If the child is still executing (active) then continue * checking other children */ if( request->is_done[i] == false && request->is_active[i] == true) { continue; } /* If the child is done executing (!active), but has not been * collected then do so */ else if( request->is_done[i] == true && request->is_active[i] == false) { /* * Find the reference in the active pool */ found_match = false; for (item = opal_list_get_first( &work_pool_active); item != opal_list_get_end( &work_pool_active); item = opal_list_get_next( item) ) { wp_item = (orte_filem_rsh_work_pool_item_t *)item; if(request == wp_item->request && i == wp_item->index) { found_match = true; break; } } /* If no match then assume on the pending list, and continue */ if( !found_match ) { continue; } OPAL_OUTPUT_VERBOSE((10, mca_filem_rsh_component.super.output_handle, "filem:rsh: wait(): Transfer complete. Cleanup\n")); opal_list_remove_item(&work_pool_active, item); /* Mark as fully done [active = true, done = true] * It does not make complete sense to call this 'active' when * it is obviously not, but this is a state [true, true] that * should only be reached if the transfer has finished * completely. */ request->is_done[i] = true; request->is_active[i] = true; /* Tell peer we are finished with a send */ permission_send_done(&(wp_item->proc_set.source), 1); OBJ_RELEASE(wp_item); wp_item = NULL; ++num_finished; } } /* * Wait for a child to complete */ if( num_finished < request->num_mv ) { OPAL_THREAD_LOCK(&work_pool_lock); opal_condition_wait(&work_pool_cond, &work_pool_lock); OPAL_THREAD_UNLOCK(&work_pool_lock); } } /* * Stage 2: Determine the return value */ for(i = 0; i < request->num_mv; ++i) { if( request->exit_status[i] < 0 ) { exit_status = request->exit_status[i]; } } return exit_status; } int orte_filem_rsh_wait_all(opal_list_t * request_list) { int ret = ORTE_SUCCESS, exit_status = ORTE_SUCCESS; opal_list_item_t *item = NULL; for (item = opal_list_get_first( request_list); item != opal_list_get_end( request_list); item = opal_list_get_next( item) ) { orte_filem_base_request_t *request = (orte_filem_base_request_t *) item; if( ORTE_SUCCESS != (ret = orte_filem_rsh_wait(request)) ) { opal_output(mca_filem_rsh_component.super.output_handle, "filem:rsh: wait_all(): Wait failed (%d)", ret); exit_status = ret; goto cleanup; } } cleanup: return exit_status; } /************************** * Support functions **************************/ static int orte_filem_rsh_start_copy(orte_filem_base_request_t *request) { int ret = ORTE_SUCCESS, exit_status = ORTE_SUCCESS; opal_list_item_t *f_item = NULL; opal_list_item_t *p_item = NULL; char *remote_machine = NULL; char *remote_file = NULL; char *command = NULL; char *dir_arg = NULL; int cur_index = 0; /* For each file pair */ for (f_item = opal_list_get_first( &request->file_sets); f_item != opal_list_get_end( &request->file_sets); f_item = opal_list_get_next( f_item) ) { orte_filem_base_file_set_t * f_set = (orte_filem_base_file_set_t*)f_item; /* For each process set */ for (p_item = opal_list_get_first( &request->process_sets); p_item != opal_list_get_end( &request->process_sets); p_item = opal_list_get_next( p_item) ) { orte_filem_base_process_set_t * p_set = (orte_filem_base_process_set_t*)p_item; if( request->movement_type == ORTE_FILEM_MOVE_TYPE_PUT ) { OPAL_OUTPUT_VERBOSE((10, mca_filem_rsh_component.super.output_handle, "filem:rsh: copy(): %s -> %s: Moving file %s to %s\n", ORTE_NAME_PRINT(&p_set->source), ORTE_NAME_PRINT(&p_set->sink), f_set->local_target, f_set->remote_target)); } else { OPAL_OUTPUT_VERBOSE((10, mca_filem_rsh_component.super.output_handle, "filem:rsh: copy(): %s -> %s: Moving file %s to %s\n", ORTE_NAME_PRINT(&p_set->source), ORTE_NAME_PRINT(&p_set->sink), f_set->remote_target, f_set->local_target)); } /* * Get the remote machine identifier from the process_name struct */ OPAL_OUTPUT_VERBOSE((10, mca_filem_rsh_component.super.output_handle, "filem:rsh: copy(): %s -> %s: Get node name.\n", ORTE_NAME_PRINT(&p_set->source), ORTE_NAME_PRINT(&p_set->sink))); if( ORTE_SUCCESS != (ret = orte_filem_base_get_proc_node_name(&p_set->source, &remote_machine))) { opal_output(mca_filem_rsh_component.super.output_handle, "filem:rsh: copy(): Get Node Name failed (%d)", ret); exit_status = ret; goto cleanup; } OPAL_OUTPUT_VERBOSE((10, mca_filem_rsh_component.super.output_handle, "filem:rsh: copy(): %s -> %s: Got node name: %s\n", ORTE_NAME_PRINT(&p_set->source), ORTE_NAME_PRINT(&p_set->sink), remote_machine)); /* * Fix the remote_filename. * If it is an absolute path, then assume it is valid for the remote server * ow then we must construct the correct path. */ OPAL_OUTPUT_VERBOSE((10, mca_filem_rsh_component.super.output_handle, "filem:rsh: copy(): %s -> %s: Query remote path (%s).\n", ORTE_NAME_PRINT(&p_set->source), ORTE_NAME_PRINT(&p_set->sink), f_set->remote_target)); remote_file = strdup(f_set->remote_target); if( ORTE_SUCCESS != (ret = orte_filem_rsh_query_remote_path(&remote_file, &p_set->source, &f_set->target_flag) ) ) { opal_output(mca_filem_rsh_component.super.output_handle, "filem:rsh: copy(): Query Remote Path failed (%d)", ret); exit_status = ret; goto cleanup; } OPAL_OUTPUT_VERBOSE((10, mca_filem_rsh_component.super.output_handle, "filem:rsh: copy(): %s -> %s: Remote path (%s) is (%s).\n", ORTE_NAME_PRINT(&p_set->source), ORTE_NAME_PRINT(&p_set->sink), f_set->remote_target, remote_file)); /* * Transfer the file or directory */ if(ORTE_FILEM_TYPE_DIR == f_set->target_flag) { dir_arg = strdup(" -r "); } else if(ORTE_FILEM_TYPE_UNKNOWN == f_set->target_flag) { opal_output(mca_filem_rsh_component.super.output_handle, "filem:rsh: copy(): Error: File type unknown"); goto continue_set; } else { dir_arg = strdup(""); } /* * If this is the put() routine */ if( request->movement_type == ORTE_FILEM_MOVE_TYPE_PUT ) { asprintf(&command, "%s %s %s %s:%s ", mca_filem_rsh_component.cp_command, dir_arg, f_set->local_target, remote_machine, remote_file); OPAL_OUTPUT_VERBOSE((17, mca_filem_rsh_component.super.output_handle, "filem:rsh:put about to execute [%s]", command)); if( ORTE_SUCCESS != (ret = orte_filem_rsh_start_command(p_set, f_set, command, request, cur_index)) ) { exit_status = ret; goto cleanup; } } /* * ow it is the get() routine */ else { asprintf(&command, "%s %s %s:%s %s ", mca_filem_rsh_component.cp_command, dir_arg, remote_machine, remote_file, f_set->local_target); OPAL_OUTPUT_VERBOSE((17, mca_filem_rsh_component.super.output_handle, "filem:rsh:get about to execute [%s]", command)); if( ORTE_SUCCESS != (ret = orte_filem_rsh_start_command(p_set, f_set, command, request, cur_index)) ) { exit_status = ret; goto cleanup; } } continue_set: /* A small bit of cleanup */ if( NULL != dir_arg) { free(dir_arg); dir_arg = NULL; } if( NULL != remote_file) { free(remote_file); remote_file = NULL; } if(NULL != remote_machine) { free(remote_machine); remote_machine = NULL; } ++cur_index; } /* For each process set */ } /* For each file pair */ cleanup: if( NULL != command ) free(command); if( NULL != remote_machine) free(remote_machine); if( NULL != dir_arg) free(dir_arg); if( NULL != remote_file) free(remote_file); return exit_status; } static int orte_filem_rsh_start_rm(orte_filem_base_request_t *request) { int ret = ORTE_SUCCESS, exit_status = ORTE_SUCCESS; opal_list_item_t *f_item = NULL; opal_list_item_t *p_item = NULL; char *command = NULL; char *remote_machine = NULL; char *remote_targets = NULL; char *remote_file = NULL; char *dir_arg = NULL; char **remote_file_set = NULL; int argc = 0; int cur_index = 0; /* For each process set */ for (p_item = opal_list_get_first( &request->process_sets); p_item != opal_list_get_end( &request->process_sets); p_item = opal_list_get_next( p_item) ) { orte_filem_base_process_set_t * p_set = (orte_filem_base_process_set_t*)p_item; /* * Get the remote machine identifier from the process_name struct */ if( ORTE_SUCCESS != (ret = orte_filem_base_get_proc_node_name(&p_set->source, &remote_machine))) { exit_status = ret; goto cleanup; } /* For each file pair */ for (f_item = opal_list_get_first( &request->file_sets); f_item != opal_list_get_end( &request->file_sets); f_item = opal_list_get_next( f_item) ) { orte_filem_base_file_set_t * f_set = (orte_filem_base_file_set_t*)f_item; /* * Fix the remote_filename. * If it is an absolute path, then assume it is valid for the remote server * ow then we must construct the correct path. */ remote_file = strdup(f_set->remote_target); if( ORTE_SUCCESS != (ret = orte_filem_rsh_query_remote_path(&remote_file, &p_set->source, &f_set->target_flag) ) ) { exit_status = ret; goto cleanup; } if(ORTE_FILEM_TYPE_UNKNOWN == f_set->target_flag) { continue; } opal_argv_append(&argc, &remote_file_set, remote_file); /* * If we are removing a directory in the mix, then we * need the recursive argument. */ if(NULL == dir_arg) { if(ORTE_FILEM_TYPE_DIR == f_set->target_flag) { dir_arg = strdup(" -rf "); } } } /* All File Pairs */ if(NULL == dir_arg) { dir_arg = strdup(" -f "); } remote_targets = opal_argv_join(remote_file_set, ' '); asprintf(&command, "%s %s rm %s %s ", mca_filem_rsh_component.remote_sh_command, remote_machine, dir_arg, remote_targets); OPAL_OUTPUT_VERBOSE((15, mca_filem_rsh_component.super.output_handle, "filem:rsh:rm about to execute [%s]", command)); if( ORTE_SUCCESS != (ret = orte_filem_rsh_start_command(p_set, NULL, command, request, cur_index)) ) { exit_status = ret; goto cleanup; } /* A small bit of cleanup */ if( NULL != dir_arg) { free(dir_arg); dir_arg = NULL; } if( NULL != remote_targets) { free(remote_targets); remote_targets = NULL; } if( NULL != remote_file_set) { opal_argv_free(remote_file_set); remote_file_set = NULL; } if(NULL != remote_machine) { free(remote_machine); remote_machine = NULL; } ++cur_index; } /* Process set */ cleanup: if( NULL != command ) free(command); if( NULL != remote_machine) free(remote_machine); if( NULL != dir_arg) free(dir_arg); if( NULL != remote_targets) free(remote_targets); if( NULL != remote_file_set) opal_argv_free(remote_file_set); return exit_status; } /****************** * Local Functions ******************/ /****************************** * Work Pool functions ******************************/ static int orte_filem_rsh_start_command(orte_filem_base_process_set_t *proc_set, orte_filem_base_file_set_t *file_set, char * command, orte_filem_base_request_t *request, int index) { orte_filem_rsh_work_pool_item_t *wp_item = NULL; int ret; /* Construct a work pool item */ wp_item = OBJ_NEW(orte_filem_rsh_work_pool_item_t); /* Copy the Process Set */ if( NULL != proc_set ) { wp_item->proc_set.source.jobid = proc_set->source.jobid; wp_item->proc_set.source.vpid = proc_set->source.vpid; wp_item->proc_set.sink.jobid = proc_set->sink.jobid; wp_item->proc_set.sink.vpid = proc_set->sink.vpid; } /* Copy the File Set */ if( NULL != file_set ) { wp_item->file_set.local_target = strdup(file_set->local_target); wp_item->file_set.remote_target = strdup(file_set->remote_target); wp_item->file_set.target_flag = file_set->target_flag; } OBJ_RETAIN(request); wp_item->command = strdup(command); wp_item->request = request; wp_item->index = index; /* * - put the request on the pending list * - wait for the peer to tell us that it can receive */ opal_list_append(&work_pool_pending, &(wp_item->super)); /* * Ask for permission to send this file so we do not overwhelm the peer */ OPAL_OUTPUT_VERBOSE((10, mca_filem_rsh_component.super.output_handle, "filem:rsh: start_command(): Ask permission to send from proc %s", ORTE_NAME_PRINT(&(proc_set->source)))); if( ORTE_SUCCESS != (ret = orte_filem_rsh_permission_ask(&(proc_set->source), 1)) ) { return ret; } return ORTE_SUCCESS; } /****************************** * Child process start and wait functions ******************************/ static int start_child(char * command, orte_filem_base_request_t *request, int index) { char **argv = NULL; int status, ret; OPAL_OUTPUT_VERBOSE((10, mca_filem_rsh_component.super.output_handle, "filem:rsh: start_child(): Starting the command [%s]", command)); /* fork() -> done = false, active = true */ request->is_done[index] = false; request->is_active[index] = true; request->exit_status[index] = fork(); if( request->exit_status[index] == 0 ) { /* Child */ /* Redirect stdout to /dev/null */ freopen( "/dev/null", "w", stdout); argv = opal_argv_split(command, ' '); status = execvp(argv[0], argv); opal_output(0, "filem:rsh:start_child Failed to exec child [%s] status = %d\n", command, status); exit(ORTE_ERROR); } else if( request->exit_status[index] > 0 ) { OPAL_OUTPUT_VERBOSE((10, mca_filem_rsh_component.super.output_handle, "filem:rsh: start_child(): Started Child %d Running command [%s]", request->exit_status[index], command)); /* * Register a callback for when this process exits */ if( ORTE_SUCCESS != (ret = orte_wait_cb(request->exit_status[index], filem_rsh_waitpid_cb, NULL) ) ) { opal_output(0, "filem:rsh: start_child(): Failed to register a waitpid callback for child [%d] executing the command [%s]\n", request->exit_status[index], command); return ret; } } else { return ORTE_ERROR; } return ORTE_SUCCESS; } static void filem_rsh_waitpid_cb(pid_t pid, int status, void* cbdata) { orte_filem_rsh_work_pool_item_t *wp_item = NULL; orte_filem_base_request_t *request; opal_list_item_t *item = NULL; int index; OPAL_OUTPUT_VERBOSE((10, mca_filem_rsh_component.super.output_handle, "filem:rsh: waitpid_cb(): Pid %d finished with status [%d].\n", pid, status)); /* * Find this pid in the active queue */ OPAL_THREAD_LOCK(&work_pool_lock); for (item = opal_list_get_first( &work_pool_active); item != opal_list_get_end( &work_pool_active); item = opal_list_get_next( item) ) { wp_item = (orte_filem_rsh_work_pool_item_t *)item; request = wp_item->request; index = wp_item->index; if( request->is_done[index] == false && request->exit_status[index] == pid ) { request->exit_status[index] = status; /* waitpid() -> done = true, active = false */ request->is_done[index] = true; request->is_active[index] = false; OPAL_OUTPUT_VERBOSE((10, mca_filem_rsh_component.super.output_handle, "filem:rsh: waitpid_cb(): Marked pid %d as complete [status = %d].\n", pid, status)); break; } } /* * Signal in case anyone is waiting for a child to finish. */ opal_condition_signal(&work_pool_cond); OPAL_THREAD_UNLOCK(&work_pool_lock); } /****************************** * Path resolution functions ******************************/ /* * This function is paired with the filem_base_process_get_remote_path_cmd() function on the remote machine */ static int orte_filem_rsh_query_remote_path(char **remote_ref, orte_process_name_t *peer, int *flag) { int ret; #if 0 /* An optimization if we are guarenteed that this remote files exists. * Then the 'scp -r' option will work with both files and directories. * JJH: For general correctness disable this piece of code. */ if( *remote_ref[0] == '/' ) { *flag = ORTE_FILEM_TYPE_DIR; return ORTE_SUCCESS; } #endif /* Call the base function */ if( ORTE_SUCCESS != (ret = orte_filem_base_get_remote_path(remote_ref, peer, flag) ) ) { return ret; } return ORTE_SUCCESS; } /****************************** * Permission functions ******************************/ static int orte_filem_rsh_permission_listener_init(orte_rml_buffer_callback_fn_t rml_cbfunc) { int ret; if( ORTE_SUCCESS != (ret = orte_rml.recv_buffer_nb(ORTE_NAME_WILDCARD, ORTE_RML_TAG_FILEM_RSH, ORTE_RML_PERSISTENT, rml_cbfunc, NULL)) ) { opal_output(mca_filem_rsh_component.super.output_handle, "filem:rsh: listener_init: Failed to register the receive callback (%d)", ret); return ret; } return ORTE_SUCCESS; } static int orte_filem_rsh_permission_listener_cancel(void) { int ret; if( ORTE_SUCCESS != (ret = orte_rml.recv_cancel(ORTE_NAME_WILDCARD, ORTE_RML_TAG_FILEM_RSH) ) ) { #if 0 opal_output(mca_filem_rsh_component.super.output_handle, "filem:rsh: listener_cancel: Failed to deregister the receive callback (%d)", ret); #endif return ret; } return ORTE_SUCCESS; } static void orte_filem_rsh_permission_callback(int status, orte_process_name_t* sender, opal_buffer_t *buffer, orte_rml_tag_t tag, void* cbdata) { orte_filem_rsh_work_pool_item_t *wp_item = NULL; opal_list_item_t *item = NULL; int ret; orte_std_cntr_t n; int num_req, num_allowed = 0; int perm_flag, i; OPAL_OUTPUT_VERBOSE((10, mca_filem_rsh_component.super.output_handle, "filem:rsh: permission_callback(? ?): Peer %s ...", ORTE_NAME_PRINT(sender))); /* * Receive the flag indicating if this is: * - Asking for permission (ORTE_FILEM_RSH_ASK) * - Allowing us to send (ORTE_FILEM_RSH_ALLOW) */ n = 1; if (ORTE_SUCCESS != (ret = opal_dss.unpack(buffer, &perm_flag, &n, OPAL_INT))) { goto cleanup; } /* Asking for permission to send */ if( ORTE_FILEM_RSH_ASK == perm_flag ) { OPAL_OUTPUT_VERBOSE((10, mca_filem_rsh_component.super.output_handle, "filem:rsh: permission_callback(ASK): Peer %s Asking permission to send [Used %d of %d]", ORTE_NAME_PRINT(sender), cur_num_incomming, orte_filem_rsh_max_incomming)); /* * Receive the requested amount */ n = 1; if (ORTE_SUCCESS != (ret = opal_dss.unpack(buffer, &num_req, &n, OPAL_INT))) { goto cleanup; } /* * Determine how many we can allow * if none then put a request on the waiting list * ow tell the peer to start sending now. * Send back number allowed to be started */ if( orte_filem_rsh_max_incomming < cur_num_incomming + 1) { /* Add to the waiting list */ OPAL_OUTPUT_VERBOSE((10, mca_filem_rsh_component.super.output_handle, "filem:rsh: permission_callback(ASK): Add Peer %s request to waiting list", ORTE_NAME_PRINT(sender))); wp_item = OBJ_NEW(orte_filem_rsh_work_pool_item_t); wp_item->proc_set.source.jobid = sender->jobid; wp_item->proc_set.source.vpid = sender->vpid; opal_list_append(&work_pool_waiting, &(wp_item->super)); } /* Start the transfer immediately */ else { num_allowed = 1; cur_num_incomming += 1; OPAL_OUTPUT_VERBOSE((10, mca_filem_rsh_component.super.output_handle, "filem:rsh: permission_callback(ASK): Respond to Peer %s with %d", ORTE_NAME_PRINT(sender), num_allowed)); permission_send_num_allowed(sender, num_allowed); } } /* Allowing us to start some number of sends */ else if( ORTE_FILEM_RSH_ALLOW == perm_flag ) { OPAL_OUTPUT_VERBOSE((10, mca_filem_rsh_component.super.output_handle, "filem:rsh: permission_callback(ALLOW): Peer %s Allowing me to send", ORTE_NAME_PRINT(sender))); /* * Receive the allowed transmit amount */ n = 1; if (ORTE_SUCCESS != (ret = opal_dss.unpack(buffer, &num_req, &n, OPAL_INT))) { goto cleanup; } /* * For each alloacted spot for transmit * - Get a pending request directed at this peer * - Start the pending request */ for(i = 0; i < num_req; ++i ) { if( 0 >= opal_list_get_size(&work_pool_pending) ) { OPAL_OUTPUT_VERBOSE((10, mca_filem_rsh_component.super.output_handle, "filem:rsh: permission_callback(ALLOW): No more pending sends to peer %s...", ORTE_NAME_PRINT(sender))); break; } for (item = opal_list_get_first( &work_pool_pending); item != opal_list_get_end( &work_pool_pending); item = opal_list_get_next( item) ) { wp_item = (orte_filem_rsh_work_pool_item_t *)item; if(sender->jobid == wp_item->proc_set.source.jobid && sender->vpid == wp_item->proc_set.source.vpid ) { opal_list_remove_item( &work_pool_pending, item); break; } } if( item == opal_list_get_end(&work_pool_pending) ) { OPAL_OUTPUT_VERBOSE((10, mca_filem_rsh_component.super.output_handle, "filem:rsh: permission_callback(ALLOW): Unable to find message on the pending list\n")); } OPAL_OUTPUT_VERBOSE((10, mca_filem_rsh_component.super.output_handle, "filem:rsh: permission_callback(ALLOW): Starting to send to peer %s... (# pending = %d)", ORTE_NAME_PRINT(sender), (int)opal_list_get_size(&work_pool_pending))); wp_item->active = true; opal_list_append(&work_pool_active, &(wp_item->super)); if( ORTE_SUCCESS != (ret = start_child(wp_item->command, wp_item->request, wp_item->index)) ) { goto cleanup; } } } /* Peer said they are done sending one or more files */ else if( ORTE_FILEM_RSH_DONE == perm_flag ) { OPAL_OUTPUT_VERBOSE((10, mca_filem_rsh_component.super.output_handle, "filem:rsh: permission_callback(DONE): Peer %s is done sending to me", ORTE_NAME_PRINT(sender))); /* * Receive the number of open slots */ n = 1; if (ORTE_SUCCESS != (ret = opal_dss.unpack(buffer, &num_req, &n, OPAL_INT))) { goto cleanup; } cur_num_incomming -= num_req; /* * For each open slot, notify a waiting peer that it may send */ for(i = 0; i < num_req; ++i ) { item = opal_list_get_first( &work_pool_waiting); if( item != opal_list_get_end( &work_pool_waiting) ) { wp_item = (orte_filem_rsh_work_pool_item_t *)item; num_allowed = 1; cur_num_incomming += 1; opal_list_remove_item(&work_pool_waiting, item); permission_send_num_allowed(&(wp_item->proc_set.source), num_allowed); OBJ_RELEASE(wp_item); } } } cleanup: return; } static int orte_filem_rsh_permission_ask(orte_process_name_t* source, int num_sends) { int ret, exit_status = ORTE_SUCCESS; opal_buffer_t loc_buffer; int perm_flag = ORTE_FILEM_RSH_ASK; OBJ_CONSTRUCT(&loc_buffer, opal_buffer_t); if (ORTE_SUCCESS != (ret = opal_dss.pack(&loc_buffer, &perm_flag, 1, OPAL_INT))) { exit_status = ret; goto cleanup; } if (ORTE_SUCCESS != (ret = opal_dss.pack(&loc_buffer, &num_sends, 1, OPAL_INT))) { exit_status = ret; goto cleanup; } if (0 > (ret = orte_rml.send_buffer(source, &loc_buffer, ORTE_RML_TAG_FILEM_RSH, 0))) { exit_status = ret; goto cleanup; } cleanup: OBJ_DESTRUCT(&loc_buffer); return exit_status; } static int permission_send_done(orte_process_name_t* peer, int num_avail) { int ret, exit_status = ORTE_SUCCESS; opal_buffer_t loc_buffer; int perm_flag = ORTE_FILEM_RSH_DONE; OBJ_CONSTRUCT(&loc_buffer, opal_buffer_t); if (ORTE_SUCCESS != (ret = opal_dss.pack(&loc_buffer, &perm_flag, 1, OPAL_INT))) { exit_status = ret; goto cleanup; } if (ORTE_SUCCESS != (ret = opal_dss.pack(&loc_buffer, &num_avail, 1, OPAL_INT))) { exit_status = ret; goto cleanup; } if (0 > (ret = orte_rml.send_buffer(peer, &loc_buffer, ORTE_RML_TAG_FILEM_RSH, 0))) { exit_status = ret; goto cleanup; } cleanup: OBJ_DESTRUCT(&loc_buffer); return exit_status; } static int permission_send_num_allowed(orte_process_name_t* peer, int num_allowed) { int ret, exit_status = ORTE_SUCCESS; opal_buffer_t loc_buffer; int perm_flag = ORTE_FILEM_RSH_ALLOW; OBJ_CONSTRUCT(&loc_buffer, opal_buffer_t); if (ORTE_SUCCESS != (ret = opal_dss.pack(&loc_buffer, &perm_flag, 1, OPAL_INT))) { exit_status = ret; goto cleanup; } if (ORTE_SUCCESS != (ret = opal_dss.pack(&loc_buffer, &num_allowed, 1, OPAL_INT))) { exit_status = ret; goto cleanup; } if (0 > (ret = orte_rml.send_buffer(peer, &loc_buffer, ORTE_RML_TAG_FILEM_RSH, 0))) { exit_status = ret; goto cleanup; } cleanup: OBJ_DESTRUCT(&loc_buffer); return exit_status; }