1
1
openmpi/orte/mca/filem/rsh/filem_rsh_module.c
Ralph Castain 9b59d8de6f This is actually a much smaller commit than it appears at first glance - it just touches a lot of files. The --without-rte-support configuration option has never really been implemented completely. The option caused various objects not to be defined and conditionally compiled some base functions, but did nothing to prevent build of the component libraries. Unfortunately, since many of those components use objects covered by the option, it caused builds to break if those components were allowed to build.
Brian dealt with this in the past by creating platform files and using "no-build" to block the components. This was clunky, but acceptable when only one organization was using that option. However, that number has now expanded to at least two more locations.

Accordingly, make --without-rte-support actually work by adding appropriate configury to prevent components from building when they shouldn't. While doing so, remove two frameworks (db and rmcast) that are no longer used as ORCM comes to a close (besides, they belonged in ORCM now anyway). Do some minor cleanups along the way.

This commit was SVN r25497.
2011-11-22 21:24:35 +00:00

1669 строки
58 KiB
C

/*
* Copyright (c) 2004-2010 The Trustees of Indiana University.
* All rights reserved.
* Copyright (c) 2004-2011 The Trustees of the University of Tennessee.
* All rights reserved.
* Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
* University of Stuttgart. All rights reserved.
* Copyright (c) 2004-2005 The Regents of the University of California.
* All rights reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
*
* $HEADER$
*/
/*
*
*/
#include "orte_config.h"
#ifdef HAVE_STRING_H
#include <string.h>
#endif
#include <stdlib.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <sys/wait.h>
#ifdef HAVE_UNISTD_H
#include <unistd.h>
#endif /* HAVE_UNISTD_H */
#include "opal/mca/mca.h"
#include "opal/mca/base/base.h"
#include "opal/mca/base/mca_base_param.h"
#include "opal/mca/event/event.h"
#include "orte/constants.h"
#include "orte/util/show_help.h"
#include "opal/util/argv.h"
#include "opal/util/output.h"
#include "opal/util/opal_environ.h"
#include "opal/util/basename.h"
#include "opal/threads/mutex.h"
#include "opal/threads/condition.h"
#include "orte/util/name_fns.h"
#include "orte/util/proc_info.h"
#include "orte/runtime/orte_globals.h"
#include "orte/runtime/orte_wait.h"
#include "orte/mca/errmgr/errmgr.h"
#include "orte/mca/rml/rml_types.h"
#include "orte/mca/filem/filem.h"
#include "orte/mca/filem/base/base.h"
#include "filem_rsh.h"
/**********
* Local Function and Variable Declarations
**********/
static int orte_filem_rsh_start_copy(orte_filem_base_request_t *request);
static int orte_filem_rsh_start_rm(orte_filem_base_request_t *request);
static int orte_filem_rsh_start_command(orte_filem_base_process_set_t *proc_set,
orte_filem_base_file_set_t *file_set,
char * command,
orte_filem_base_request_t *request,
int index);
static int start_child(char * command,
orte_filem_base_request_t *request,
int index);
static int orte_filem_rsh_query_remote_path(char **remote_ref,
orte_process_name_t *proc,
int *flag);
static void filem_rsh_waitpid_cb(pid_t pid, int status, void* cbdata);
/* Permission to send functionality */
static int orte_filem_rsh_permission_listener_init(void);
static int orte_filem_rsh_permission_listener_cancel(void);
static void orte_filem_rsh_permission_callback(int status,
orte_process_name_t* sender,
opal_buffer_t *buffer,
orte_rml_tag_t tag,
void* cbdata);
static int orte_filem_rsh_permission_ask(orte_process_name_t* sender, int num_sends);
static int permission_send_done(orte_process_name_t* sender, int num_avail,
int32_t exit_status,
char * local_target,
char * remote_target,
char * command);
static int permission_send_num_allowed(orte_process_name_t* sender, int num_allowed);
/*************
* Local work pool structure
*************/
int cur_num_incomming = 0;
int cur_num_outgoing = 0;
static bool work_pool_all_done = false;
static opal_mutex_t work_pool_lock;
static opal_condition_t work_pool_cond;
/*
* work_pool_waiting:
* - processes that are waiting for my permission to put() to me
*/
opal_list_t work_pool_waiting;
/*
* work_pool_held:
* - requests that are held before asking permission to reduce load
*/
opal_list_t work_pool_held;
/*
* work_pool_pending:
* - put requests waiting on permission to send to peer
*/
opal_list_t work_pool_pending;
/*
* work_pool_active:
* - put requests currently sending
*/
opal_list_t work_pool_active;
struct orte_filem_rsh_work_pool_item_t {
/** This is an object, so must have a super */
opal_list_item_t super;
/** Command to exec */
char * command;
/** Pointer to FileM Request */
orte_filem_base_request_t *request;
/** Index into the request */
int index;
/** Process Set */
orte_filem_base_process_set_t proc_set;
/** File Set */
orte_filem_base_file_set_t file_set;
/** If this item is active */
bool active;
};
typedef struct orte_filem_rsh_work_pool_item_t orte_filem_rsh_work_pool_item_t;
OBJ_CLASS_DECLARATION(orte_filem_rsh_work_pool_item_t);
void orte_filem_rsh_work_pool_construct(orte_filem_rsh_work_pool_item_t *obj);
void orte_filem_rsh_work_pool_destruct( orte_filem_rsh_work_pool_item_t *obj);
OBJ_CLASS_INSTANCE(orte_filem_rsh_work_pool_item_t,
opal_list_item_t,
orte_filem_rsh_work_pool_construct,
orte_filem_rsh_work_pool_destruct);
void orte_filem_rsh_work_pool_construct(orte_filem_rsh_work_pool_item_t *obj) {
obj->command = NULL;
obj->request = NULL;
obj->index = 0;
OBJ_CONSTRUCT(&(obj->proc_set), orte_filem_base_process_set_t);
OBJ_CONSTRUCT(&(obj->file_set), orte_filem_base_file_set_t);
obj->active = false;
}
void orte_filem_rsh_work_pool_destruct( orte_filem_rsh_work_pool_item_t *obj) {
if( NULL != obj->command ) {
free(obj->command);
obj->command = NULL;
}
if( NULL != obj->request ) {
OBJ_RELEASE(obj->request);
obj->request = NULL;
}
obj->index = 0;
OBJ_DESTRUCT(&(obj->proc_set));
OBJ_DESTRUCT(&(obj->file_set));
obj->active = false;
}
/*
* Rsh module
*/
static orte_filem_base_module_t loc_module = {
/** Initialization Function */
orte_filem_rsh_module_init,
/** Finalization Function */
orte_filem_rsh_module_finalize,
orte_filem_rsh_put,
orte_filem_rsh_put_nb,
orte_filem_rsh_get,
orte_filem_rsh_get_nb,
orte_filem_rsh_rm,
orte_filem_rsh_rm_nb,
orte_filem_rsh_wait,
orte_filem_rsh_wait_all
};
/*
* MCA Functions
*/
int orte_filem_rsh_component_query(mca_base_module_t **module, int *priority)
{
OPAL_OUTPUT_VERBOSE((10, mca_filem_rsh_component.super.output_handle,
"filem:rsh: component_query()"));
*priority = mca_filem_rsh_component.super.priority;
*module = (mca_base_module_t *)&loc_module;
return ORTE_SUCCESS;
}
int orte_filem_rsh_module_init(void)
{
int ret;
orte_filem_base_is_active = false;
OPAL_OUTPUT_VERBOSE((10, mca_filem_rsh_component.super.output_handle,
"filem:rsh: module_init()"));
/*
* Allocate the work pools
*/
OBJ_CONSTRUCT(&work_pool_waiting, opal_list_t);
OBJ_CONSTRUCT(&work_pool_held, opal_list_t);
OBJ_CONSTRUCT(&work_pool_pending, opal_list_t);
OBJ_CONSTRUCT(&work_pool_active, opal_list_t);
OBJ_CONSTRUCT(&work_pool_lock, opal_mutex_t);
OBJ_CONSTRUCT(&work_pool_cond, opal_condition_t);
work_pool_all_done = false;
/*
* Start the listener for permission
*/
if( ORTE_SUCCESS != (ret = orte_filem_rsh_permission_listener_init())) {
opal_output(mca_filem_rsh_component.super.output_handle,
"filem:rsh:init Failed to start listener\n");
return ret;
}
/* start the base receive */
if (ORTE_SUCCESS != (ret = orte_filem_base_comm_start())) {
opal_output(mca_filem_rsh_component.super.output_handle,
"filem:rsh:init Failed to start base receive\n");
return ret;
}
return ORTE_SUCCESS;
}
int orte_filem_rsh_module_finalize(void)
{
opal_list_item_t *item = NULL;
OPAL_OUTPUT_VERBOSE((10, mca_filem_rsh_component.super.output_handle,
"filem:rsh: module_finalize()"));
/*
* Make sure all active requests are completed
*/
if( orte_filem_base_is_active ) {
while(0 < opal_list_get_size(&work_pool_active) ) {
opal_progress();
}
}
orte_filem_base_is_active = false;
/*
* Stop the listeners
*/
orte_filem_rsh_permission_listener_cancel();
/*
* Stop the base receive
*/
orte_filem_base_comm_stop();
/*
* Deallocate the work pools
*/
while( NULL != (item = opal_list_remove_first(&work_pool_waiting)) ) {
OBJ_RELEASE(item);
}
OBJ_DESTRUCT(&work_pool_waiting);
while( NULL != (item = opal_list_remove_first(&work_pool_held)) ) {
OBJ_RELEASE(item);
}
OBJ_DESTRUCT(&work_pool_held);
while( NULL != (item = opal_list_remove_first(&work_pool_pending)) ) {
OBJ_RELEASE(item);
}
OBJ_DESTRUCT(&work_pool_pending);
while( NULL != (item = opal_list_remove_first(&work_pool_active)) ) {
OBJ_RELEASE(item);
}
OBJ_DESTRUCT(&work_pool_active);
OBJ_DESTRUCT(&work_pool_lock);
OBJ_DESTRUCT(&work_pool_cond);
return ORTE_SUCCESS;
}
/******************
* Local functions
******************/
int orte_filem_rsh_put(orte_filem_base_request_t *request)
{
int ret, exit_status = ORTE_SUCCESS;
orte_filem_base_is_active = true;
if( ORTE_SUCCESS != (ret = orte_filem_base_prepare_request(request, ORTE_FILEM_MOVE_TYPE_PUT) ) ) {
opal_output(mca_filem_rsh_component.super.output_handle,
"filem:rsh: put(): Failed to prepare the request structure (%d)", ret);
exit_status = ret;
goto cleanup;
}
if( ORTE_SUCCESS != (ret = orte_filem_rsh_start_copy(request) ) ) {
opal_output(mca_filem_rsh_component.super.output_handle,
"filem:rsh: put(): Failed to post the request (%d)", ret);
exit_status = ret;
goto cleanup;
}
if( ORTE_SUCCESS != (ret = orte_filem_rsh_wait(request)) ) {
opal_output(mca_filem_rsh_component.super.output_handle,
"filem:rsh: put(): Failed to wait on the request (%d)", ret);
exit_status = ret;
goto cleanup;
}
cleanup:
if( 0 < opal_list_get_size(&work_pool_active) ) {
orte_filem_base_is_active = true;
} else {
orte_filem_base_is_active = false;
}
return exit_status;
}
int orte_filem_rsh_put_nb(orte_filem_base_request_t *request)
{
int ret, exit_status = ORTE_SUCCESS;
orte_filem_base_is_active = true;
if( ORTE_SUCCESS != (ret = orte_filem_base_prepare_request(request, ORTE_FILEM_MOVE_TYPE_PUT) ) ) {
opal_output(mca_filem_rsh_component.super.output_handle,
"filem:rsh: put(): Failed to prepare the request structure (%d)", ret);
exit_status = ret;
goto cleanup;
}
if( ORTE_SUCCESS != (ret = orte_filem_rsh_start_copy(request) ) ) {
opal_output(mca_filem_rsh_component.super.output_handle,
"filem:rsh: put(): Failed to post the request (%d)", ret);
exit_status = ret;
goto cleanup;
}
cleanup:
if( 0 < opal_list_get_size(&work_pool_active) ) {
orte_filem_base_is_active = true;
} else {
orte_filem_base_is_active = false;
}
return exit_status;
}
int orte_filem_rsh_get(orte_filem_base_request_t *request)
{
int ret, exit_status = ORTE_SUCCESS;
orte_filem_base_is_active = true;
if( ORTE_SUCCESS != (ret = orte_filem_base_prepare_request(request, ORTE_FILEM_MOVE_TYPE_GET) ) ) {
opal_output(mca_filem_rsh_component.super.output_handle,
"filem:rsh: get(): Failed to prepare the request structure (%d)", ret);
exit_status = ret;
goto cleanup;
}
if( ORTE_SUCCESS != (ret = orte_filem_rsh_start_copy(request) ) ) {
opal_output(mca_filem_rsh_component.super.output_handle,
"filem:rsh: get(): Failed to post the request (%d)", ret);
exit_status = ret;
goto cleanup;
}
if( ORTE_SUCCESS != (ret = orte_filem_rsh_wait(request)) ) {
opal_output(mca_filem_rsh_component.super.output_handle,
"filem:rsh: get(): Failed to wait on the request (%d)", ret);
exit_status = ret;
goto cleanup;
}
cleanup:
if( 0 < opal_list_get_size(&work_pool_active) ) {
orte_filem_base_is_active = true;
} else {
orte_filem_base_is_active = false;
}
return exit_status;
}
int orte_filem_rsh_get_nb(orte_filem_base_request_t *request)
{
int ret, exit_status = ORTE_SUCCESS;
orte_filem_base_is_active = true;
if( ORTE_SUCCESS != (ret = orte_filem_base_prepare_request(request, ORTE_FILEM_MOVE_TYPE_GET) ) ) {
opal_output(mca_filem_rsh_component.super.output_handle,
"filem:rsh: get(): Failed to prepare the request structure (%d)", ret);
exit_status = ret;
goto cleanup;
}
if( ORTE_SUCCESS != (ret = orte_filem_rsh_start_copy(request) ) ) {
opal_output(mca_filem_rsh_component.super.output_handle,
"filem:rsh: get(): Failed to post the request (%d)", ret);
exit_status = ret;
goto cleanup;
}
cleanup:
if( 0 < opal_list_get_size(&work_pool_active) ) {
orte_filem_base_is_active = true;
} else {
orte_filem_base_is_active = false;
}
return exit_status;
}
int orte_filem_rsh_rm(orte_filem_base_request_t *request)
{
int ret = ORTE_SUCCESS, exit_status = ORTE_SUCCESS;
orte_filem_base_is_active = true;
if( ORTE_SUCCESS != (ret = orte_filem_base_prepare_request(request, ORTE_FILEM_MOVE_TYPE_RM) ) ) {
opal_output(mca_filem_rsh_component.super.output_handle,
"filem:rsh: rm(): Failed to prepare on the request (%d)", ret);
exit_status = ret;
goto cleanup;
}
if( ORTE_SUCCESS != (ret = orte_filem_rsh_start_rm(request) ) ) {
opal_output(mca_filem_rsh_component.super.output_handle,
"filem:rsh: rm(): Failed to start the request (%d)", ret);
exit_status = ret;
goto cleanup;
}
if( ORTE_SUCCESS != (ret = orte_filem_rsh_wait(request)) ) {
opal_output(mca_filem_rsh_component.super.output_handle,
"filem:rsh: rm(): Failed to wait on the request (%d)", ret);
exit_status = ret;
goto cleanup;
}
cleanup:
if( 0 < opal_list_get_size(&work_pool_active) ) {
orte_filem_base_is_active = true;
} else {
orte_filem_base_is_active = false;
}
return exit_status;
}
int orte_filem_rsh_rm_nb(orte_filem_base_request_t *request)
{
int ret = ORTE_SUCCESS, exit_status = ORTE_SUCCESS;
orte_filem_base_is_active = true;
if( ORTE_SUCCESS != (ret = orte_filem_base_prepare_request(request, ORTE_FILEM_MOVE_TYPE_RM) ) ) {
opal_output(mca_filem_rsh_component.super.output_handle,
"filem:rsh: rm_nb(): Failed to prepare on the request (%d)", ret);
exit_status = ret;
goto cleanup;
}
if( ORTE_SUCCESS != (ret = orte_filem_rsh_start_rm(request) ) ) {
opal_output(mca_filem_rsh_component.super.output_handle,
"filem:rsh: rm_nb(): Failed to start on the request (%d)", ret);
exit_status = ret;
goto cleanup;
}
cleanup:
if( 0 < opal_list_get_size(&work_pool_active) ) {
orte_filem_base_is_active = true;
} else {
orte_filem_base_is_active = false;
}
return exit_status;
}
int orte_filem_rsh_wait(orte_filem_base_request_t *request)
{
int exit_status = ORTE_SUCCESS;
orte_filem_rsh_work_pool_item_t *wp_item = NULL;
opal_list_item_t *item = NULL;
int i;
int num_finished = 0;
bool found_match = false;
/*
* Stage 0: A pass to see if the request is already done
*/
for(i = 0; i < request->num_mv; ++i) {
if( request->is_done[i] == true &&
request->is_active[i] == true) {
++num_finished;
}
}
while(num_finished < request->num_mv ) {
/*
* Stage 1: Complete all active requests
*/
for(i = 0; i < request->num_mv; ++i) {
/* If the child is still executing (active) then continue
* checking other children
*/
if( request->is_done[i] == false &&
request->is_active[i] == true) {
continue;
}
/* If the child is done executing (!active), but has not been
* collected then do so
*/
else if( request->is_done[i] == true &&
request->is_active[i] == false) {
/*
* Find the reference in the active pool
*/
found_match = false;
for (item = opal_list_get_first( &work_pool_active);
item != opal_list_get_end( &work_pool_active);
item = opal_list_get_next( item) ) {
wp_item = (orte_filem_rsh_work_pool_item_t *)item;
if(request == wp_item->request &&
i == wp_item->index) {
found_match = true;
break;
}
}
/* If no match then assume on the pending list, and continue */
if( !found_match ) {
continue;
}
OPAL_OUTPUT_VERBOSE((10, mca_filem_rsh_component.super.output_handle,
"filem:rsh: wait(): Transfer complete. Cleanup\n"));
opal_list_remove_item(&work_pool_active, item);
/* Mark as fully done [active = true, done = true]
* It does not make complete sense to call this 'active' when
* it is obviously not, but this is a state [true, true] that
* should only be reached if the transfer has finished
* completely.
*/
request->is_done[i] = true;
request->is_active[i] = true;
/* Tell peer we are finished with a send */
permission_send_done(&(wp_item->proc_set.source), 1, request->exit_status[i],
wp_item->file_set.local_target,
wp_item->file_set.remote_target,
wp_item->command);
OBJ_RELEASE(wp_item);
wp_item = NULL;
++num_finished;
}
}
/*
* Wait for a child to complete
*/
if( num_finished < request->num_mv ) {
OPAL_THREAD_LOCK(&work_pool_lock);
opal_condition_wait(&work_pool_cond,
&work_pool_lock);
OPAL_THREAD_UNLOCK(&work_pool_lock);
}
}
/*
* Stage 2: Determine the return value
*/
for(i = 0; i < request->num_mv; ++i) {
if( request->exit_status[i] < 0 ) {
exit_status = request->exit_status[i];
}
}
if( 0 < opal_list_get_size(&work_pool_active) ) {
orte_filem_base_is_active = true;
} else {
orte_filem_base_is_active = false;
}
return exit_status;
}
int orte_filem_rsh_wait_all(opal_list_t * request_list)
{
int ret = ORTE_SUCCESS, exit_status = ORTE_SUCCESS;
opal_list_item_t *item = NULL;
double perc_done, last_reported = 0.0;
int total, done;
total = opal_list_get_size(request_list);
done = 0;
for (item = opal_list_get_first( request_list);
item != opal_list_get_end( request_list);
item = opal_list_get_next( item) ) {
orte_filem_base_request_t *request = (orte_filem_base_request_t *) item;
if( ORTE_SUCCESS != (ret = orte_filem_rsh_wait(request)) ) {
opal_output(mca_filem_rsh_component.super.output_handle,
"filem:rsh: wait_all(): Wait failed (%d)", ret);
exit_status = ret;
goto cleanup;
}
/* Progress Meter */
if( OPAL_UNLIKELY(orte_filem_rsh_progress_meter > 0) ) {
++done;
perc_done = (total - done) / (1.0 * total);
perc_done = (perc_done-1)*(-100.0);
if( perc_done >= (last_reported + orte_filem_rsh_progress_meter) || last_reported == 0.0 ) {
last_reported = perc_done;
opal_output(0, "filem:rsh: progress: %10.2f %c Finished\n",
perc_done, '%');
}
}
}
cleanup:
if( 0 < opal_list_get_size(&work_pool_active) ) {
orte_filem_base_is_active = true;
} else {
orte_filem_base_is_active = false;
}
return exit_status;
}
/**************************
* Support functions
**************************/
static int orte_filem_rsh_start_copy(orte_filem_base_request_t *request) {
int ret = ORTE_SUCCESS, exit_status = ORTE_SUCCESS;
opal_list_item_t *f_item = NULL;
opal_list_item_t *p_item = NULL;
char *remote_machine = NULL;
char *remote_file = NULL;
char *command = NULL;
char *dir_arg = NULL;
int cur_index = 0;
/* For each file pair */
for (f_item = opal_list_get_first( &request->file_sets);
f_item != opal_list_get_end( &request->file_sets);
f_item = opal_list_get_next( f_item) ) {
orte_filem_base_file_set_t * f_set = (orte_filem_base_file_set_t*)f_item;
/* For each process set */
for (p_item = opal_list_get_first( &request->process_sets);
p_item != opal_list_get_end( &request->process_sets);
p_item = opal_list_get_next( p_item) ) {
orte_filem_base_process_set_t * p_set = (orte_filem_base_process_set_t*)p_item;
/*
* If the source and sink are the same, then this is a local operation
* Further if the files are the same, then nothing to do
*/
if (OPAL_EQUAL == orte_util_compare_name_fields(ORTE_NS_CMP_ALL, &p_set->source, &p_set->sink) ) {
if( 0 == strncmp(f_set->local_target, f_set->remote_target, strlen(f_set->remote_target) ) ) {
request->is_done[cur_index] = true;
request->is_active[cur_index] = true;
request->exit_status[cur_index] = 0;
goto continue_set;
}
}
if( request->movement_type == ORTE_FILEM_MOVE_TYPE_PUT ) {
/*
* The file should exist if we are going to put it somewhere else
*/
if( 0 != access(f_set->local_target, R_OK) ) {
OPAL_OUTPUT_VERBOSE((10, mca_filem_rsh_component.super.output_handle,
"filem:rsh: copy(): %s -> %s: Error: Cannot move file %s to %s. Does not exist at source\n",
ORTE_NAME_PRINT(&p_set->source),
ORTE_NAME_PRINT(&p_set->sink),
f_set->local_target,
f_set->remote_target));
orte_show_help("help-orte-filem-rsh.txt",
"orte-filem-rsh:get-file-not-exist",
true, f_set->local_target, orte_process_info.nodename);
request->is_done[cur_index] = true;
request->is_active[cur_index] = true;
request->exit_status[cur_index] = -1;
goto continue_set;
}
}
/* Do not check a local get() operation, to help supress the warnings from the HNP */
else if (OPAL_EQUAL != orte_util_compare_name_fields(ORTE_NS_CMP_ALL, &p_set->source, &p_set->sink) ) {
char *base = NULL;
asprintf(&base, "%s/%s", f_set->local_target, opal_basename(f_set->remote_target));
/*
* The file should not exist if we are getting a file with the
* same name since we do not want to overwrite the filename
* without the users consent.
*/
if( 0 == access(base, R_OK) ) {
OPAL_OUTPUT_VERBOSE((10, mca_filem_rsh_component.super.output_handle,
"filem:rsh: copy(): %s -> %s: Error: Cannot move file %s to %s. Already exists at destination (%s)\n",
ORTE_NAME_PRINT(&p_set->source),
ORTE_NAME_PRINT(&p_set->sink),
f_set->remote_target,
f_set->local_target, base));
orte_show_help("help-orte-filem-rsh.txt",
"orte-filem-rsh:get-file-exists",
true, f_set->local_target, orte_process_info.nodename);
free(base);
base = NULL;
request->is_done[cur_index] = true;
request->is_active[cur_index] = true;
request->exit_status[cur_index] = -1;
goto continue_set;
}
free(base);
base = NULL;
}
if( request->movement_type == ORTE_FILEM_MOVE_TYPE_PUT ) {
OPAL_OUTPUT_VERBOSE((10, mca_filem_rsh_component.super.output_handle,
"filem:rsh: copy(): %s -> %s: Moving file %s %s to %s %s\n",
ORTE_NAME_PRINT(&p_set->source),
ORTE_NAME_PRINT(&p_set->sink),
(f_set->local_hint == ORTE_FILEM_HINT_SHARED ? "(S)" : ""),
f_set->local_target,
(f_set->remote_hint == ORTE_FILEM_HINT_SHARED ? "(S)" : ""),
f_set->remote_target));
} else {
OPAL_OUTPUT_VERBOSE((10, mca_filem_rsh_component.super.output_handle,
"filem:rsh: copy(): %s -> %s: Moving file %s %s to %s %s\n",
ORTE_NAME_PRINT(&p_set->source),
ORTE_NAME_PRINT(&p_set->sink),
(f_set->remote_hint == ORTE_FILEM_HINT_SHARED ? "(S)" : ""),
f_set->remote_target,
(f_set->local_hint == ORTE_FILEM_HINT_SHARED ? "(S)" : ""),
f_set->local_target));
}
/*
* Get the remote machine identifier from the process_name struct
*/
OPAL_OUTPUT_VERBOSE((10, mca_filem_rsh_component.super.output_handle,
"filem:rsh: copy(): %s -> %s: Get node name.\n",
ORTE_NAME_PRINT(&p_set->source),
ORTE_NAME_PRINT(&p_set->sink)));
if( ORTE_SUCCESS != (ret = orte_filem_base_get_proc_node_name(&p_set->source, &remote_machine))) {
opal_output(mca_filem_rsh_component.super.output_handle,
"filem:rsh: copy(): Get Node Name failed (%d)", ret);
exit_status = ret;
goto cleanup;
}
OPAL_OUTPUT_VERBOSE((10, mca_filem_rsh_component.super.output_handle,
"filem:rsh: copy(): %s -> %s: Got node name: %s\n",
ORTE_NAME_PRINT(&p_set->source),
ORTE_NAME_PRINT(&p_set->sink),
remote_machine));
/*
* Fix the remote_filename.
* If it is an absolute path, then assume it is valid for the remote server
* ow then we must construct the correct path.
*/
OPAL_OUTPUT_VERBOSE((10, mca_filem_rsh_component.super.output_handle,
"filem:rsh: copy(): %s -> %s: Query remote path (%s).\n",
ORTE_NAME_PRINT(&p_set->source),
ORTE_NAME_PRINT(&p_set->sink),
f_set->remote_target));
remote_file = strdup(f_set->remote_target);
if( ORTE_SUCCESS != (ret = orte_filem_rsh_query_remote_path(&remote_file, &p_set->source, &f_set->target_flag) ) ) {
opal_output(mca_filem_rsh_component.super.output_handle,
"filem:rsh: copy(): Query Remote Path failed (%d)", ret);
exit_status = ret;
goto cleanup;
}
OPAL_OUTPUT_VERBOSE((10, mca_filem_rsh_component.super.output_handle,
"filem:rsh: copy(): %s -> %s: Remote path (%s) is (%s).\n",
ORTE_NAME_PRINT(&p_set->source),
ORTE_NAME_PRINT(&p_set->sink),
f_set->remote_target,
remote_file));
/*
* Transfer the file or directory
*/
if(ORTE_FILEM_TYPE_DIR == f_set->target_flag) {
dir_arg = strdup(" -r ");
}
else if(ORTE_FILEM_TYPE_UNKNOWN == f_set->target_flag) {
opal_output(mca_filem_rsh_component.super.output_handle,
"filem:rsh: copy(): Error: File type unknown (%s)",
f_set->remote_target);
request->is_done[cur_index] = true;
request->is_active[cur_index] = true;
request->exit_status[cur_index] = -1;
goto continue_set;
}
else {
dir_arg = strdup("");
}
/*
* If this is the put() routine
*/
if( request->movement_type == ORTE_FILEM_MOVE_TYPE_PUT ) {
/* Use a local 'cp' when able */
if(f_set->remote_hint == ORTE_FILEM_HINT_SHARED ) {
asprintf(&command, "%s %s %s %s ",
mca_filem_rsh_component.cp_local_command,
dir_arg,
f_set->local_target,
remote_file);
} else {
asprintf(&command, "%s %s %s %s:%s ",
mca_filem_rsh_component.cp_command,
dir_arg,
f_set->local_target,
remote_machine,
remote_file);
}
}
/*
* ow it is the get() routine
*/
else {
/* Use a local 'cp' when able */
if(f_set->local_hint == ORTE_FILEM_HINT_SHARED ) {
asprintf(&command, "%s %s %s %s %s %s ",
mca_filem_rsh_component.remote_sh_command,
remote_machine,
mca_filem_rsh_component.cp_local_command,
dir_arg,
remote_file,
f_set->local_target);
} else {
asprintf(&command, "%s %s %s:%s %s ",
mca_filem_rsh_component.cp_command,
dir_arg,
remote_machine,
remote_file,
f_set->local_target);
}
}
/*
* Start the command
*/
OPAL_OUTPUT_VERBOSE((17, mca_filem_rsh_component.super.output_handle,
"filem:rsh:%s about to execute [%s]",
(request->movement_type == ORTE_FILEM_MOVE_TYPE_PUT ? "put" : "get"),
command));
if( ORTE_SUCCESS != (ret = orte_filem_rsh_start_command(p_set,
f_set,
command,
request,
cur_index)) ) {
exit_status = ret;
goto cleanup;
}
continue_set:
/* A small bit of cleanup */
if( NULL != dir_arg) {
free(dir_arg);
dir_arg = NULL;
}
if( NULL != remote_file) {
free(remote_file);
remote_file = NULL;
}
if(NULL != remote_machine) {
free(remote_machine);
remote_machine = NULL;
}
++cur_index;
} /* For each process set */
} /* For each file pair */
cleanup:
if( NULL != command )
free(command);
if( NULL != remote_machine)
free(remote_machine);
if( NULL != dir_arg)
free(dir_arg);
if( NULL != remote_file)
free(remote_file);
return exit_status;
}
static int orte_filem_rsh_start_rm(orte_filem_base_request_t *request)
{
int ret = ORTE_SUCCESS, exit_status = ORTE_SUCCESS;
opal_list_item_t *f_item = NULL;
opal_list_item_t *p_item = NULL;
char *command = NULL;
char *remote_machine = NULL;
char *remote_targets = NULL;
char *remote_file = NULL;
char *dir_arg = NULL;
char **remote_file_set = NULL;
int argc = 0;
int cur_index = 0;
/* For each process set */
for (p_item = opal_list_get_first( &request->process_sets);
p_item != opal_list_get_end( &request->process_sets);
p_item = opal_list_get_next( p_item) ) {
orte_filem_base_process_set_t * p_set = (orte_filem_base_process_set_t*)p_item;
/*
* Get the remote machine identifier from the process_name struct
*/
if( ORTE_SUCCESS != (ret = orte_filem_base_get_proc_node_name(&p_set->source, &remote_machine))) {
exit_status = ret;
goto cleanup;
}
/* For each file pair */
for (f_item = opal_list_get_first( &request->file_sets);
f_item != opal_list_get_end( &request->file_sets);
f_item = opal_list_get_next( f_item) ) {
orte_filem_base_file_set_t * f_set = (orte_filem_base_file_set_t*)f_item;
/*
* Fix the remote_filename.
* If it is an absolute path, then assume it is valid for the remote server
* ow then we must construct the correct path.
*/
remote_file = strdup(f_set->remote_target);
if( ORTE_SUCCESS != (ret = orte_filem_rsh_query_remote_path(&remote_file, &p_set->source, &f_set->target_flag) ) ) {
exit_status = ret;
goto cleanup;
}
if(ORTE_FILEM_TYPE_UNKNOWN == f_set->target_flag) {
continue;
}
opal_argv_append(&argc, &remote_file_set, remote_file);
/*
* If we are removing a directory in the mix, then we
* need the recursive argument.
*/
if(NULL == dir_arg) {
if(ORTE_FILEM_TYPE_DIR == f_set->target_flag) {
dir_arg = strdup(" -rf ");
}
}
} /* All File Pairs */
if(NULL == dir_arg) {
dir_arg = strdup(" -f ");
}
remote_targets = opal_argv_join(remote_file_set, ' ');
asprintf(&command, "%s %s rm %s %s ",
mca_filem_rsh_component.remote_sh_command,
remote_machine,
dir_arg,
remote_targets);
OPAL_OUTPUT_VERBOSE((15, mca_filem_rsh_component.super.output_handle,
"filem:rsh:rm about to execute [%s]", command));
if( ORTE_SUCCESS != (ret = orte_filem_rsh_start_command(p_set,
NULL,
command,
request,
cur_index)) ) {
exit_status = ret;
goto cleanup;
}
/* A small bit of cleanup */
if( NULL != dir_arg) {
free(dir_arg);
dir_arg = NULL;
}
if( NULL != remote_targets) {
free(remote_targets);
remote_targets = NULL;
}
if( NULL != remote_file_set) {
opal_argv_free(remote_file_set);
remote_file_set = NULL;
}
if(NULL != remote_machine) {
free(remote_machine);
remote_machine = NULL;
}
++cur_index;
} /* Process set */
cleanup:
if( NULL != command )
free(command);
if( NULL != remote_machine)
free(remote_machine);
if( NULL != dir_arg)
free(dir_arg);
if( NULL != remote_targets)
free(remote_targets);
if( NULL != remote_file_set)
opal_argv_free(remote_file_set);
return exit_status;
}
/******************
* Local Functions
******************/
/******************************
* Work Pool functions
******************************/
static int orte_filem_rsh_start_command(orte_filem_base_process_set_t *proc_set,
orte_filem_base_file_set_t *file_set,
char * command,
orte_filem_base_request_t *request,
int index)
{
orte_filem_rsh_work_pool_item_t *wp_item = NULL;
int ret;
/* Construct a work pool item */
wp_item = OBJ_NEW(orte_filem_rsh_work_pool_item_t);
/* Copy the Process Set */
if( NULL != proc_set ) {
wp_item->proc_set.source.jobid = proc_set->source.jobid;
wp_item->proc_set.source.vpid = proc_set->source.vpid;
ORTE_EPOCH_SET(wp_item->proc_set.source.epoch,proc_set->source.epoch);
wp_item->proc_set.sink.jobid = proc_set->sink.jobid;
wp_item->proc_set.sink.vpid = proc_set->sink.vpid;
ORTE_EPOCH_SET(wp_item->proc_set.sink.epoch,proc_set->sink.epoch);
}
/* Copy the File Set */
if( NULL != file_set ) {
wp_item->file_set.local_target = strdup(file_set->local_target);
wp_item->file_set.remote_target = strdup(file_set->remote_target);
wp_item->file_set.target_flag = file_set->target_flag;
}
OBJ_RETAIN(request);
wp_item->command = strdup(command);
wp_item->request = request;
wp_item->index = index;
if( orte_filem_rsh_max_outgoing > 0 && cur_num_outgoing >= orte_filem_rsh_max_outgoing ) {
OPAL_OUTPUT_VERBOSE((10, mca_filem_rsh_component.super.output_handle,
"filem:rsh: wait(): *** Hold send from proc %s (%d of %d)",
ORTE_NAME_PRINT(&(wp_item->proc_set.source)), cur_num_outgoing, orte_filem_rsh_max_outgoing));
/*
* - put the request on the held list, since we only allow 2 active filem ops at a time
*/
opal_list_append(&work_pool_held, &(wp_item->super));
}
else {
++cur_num_outgoing;
/*
* - put the request on the pending list
* - wait for the peer to tell us that it can receive
*/
opal_list_append(&work_pool_pending, &(wp_item->super));
/*
* Ask for permission to send this file so we do not overwhelm the peer
* Allow only one file request at a time.
* JJH: Look into permission for multiple file permissions at a time
*/
OPAL_OUTPUT_VERBOSE((10, mca_filem_rsh_component.super.output_handle,
"filem:rsh: start_command(): Ask permission to send from proc %s (%d of %d)",
ORTE_NAME_PRINT(&(proc_set->source)), cur_num_outgoing, orte_filem_rsh_max_outgoing));
if( ORTE_SUCCESS != (ret = orte_filem_rsh_permission_ask(&(proc_set->source), 1)) ) {
return ret;
}
}
return ORTE_SUCCESS;
}
/******************************
* Child process start and wait functions
******************************/
static int start_child(char * command,
orte_filem_base_request_t *request,
int index)
{
char **argv = NULL;
int status, ret;
OPAL_OUTPUT_VERBOSE((10, mca_filem_rsh_component.super.output_handle,
"filem:rsh: start_child(): Starting the command [%s]",
command));
/* fork() -> done = false, active = true */
request->is_done[index] = false;
request->is_active[index] = true;
request->exit_status[index] = fork();
if( request->exit_status[index] == 0 ) { /* Child */
/* Redirect stdout to /dev/null */
freopen( "/dev/null", "w", stdout);
argv = opal_argv_split(command, ' ');
status = execvp(argv[0], argv);
opal_output(0, "filem:rsh:start_child Failed to exec child [%s] status = %d\n", command, status);
exit(ORTE_ERROR);
}
else if( request->exit_status[index] > 0 ) {
OPAL_OUTPUT_VERBOSE((10, mca_filem_rsh_component.super.output_handle,
"filem:rsh: start_child(): Started Child %d Running command [%s]",
request->exit_status[index], command));
/*
* Register a callback for when this process exits
*/
if( ORTE_SUCCESS != (ret = orte_wait_cb(request->exit_status[index], filem_rsh_waitpid_cb, NULL) ) ) {
opal_output(0, "filem:rsh: start_child(): Failed to register a waitpid callback for child [%d] executing the command [%s]\n",
request->exit_status[index], command);
return ret;
}
}
else {
return ORTE_ERROR;
}
return ORTE_SUCCESS;
}
static void filem_rsh_waitpid_cb(pid_t pid, int status, void* cbdata)
{
int ret;
orte_filem_rsh_work_pool_item_t *wp_item = NULL;
orte_filem_base_request_t *request;
opal_list_item_t *item = NULL;
int index;
OPAL_OUTPUT_VERBOSE((10, mca_filem_rsh_component.super.output_handle,
"filem:rsh: waitpid_cb(): Pid %d finished with status [%d].\n",
pid, status));
/*
* Find this pid in the active queue
*/
OPAL_THREAD_LOCK(&work_pool_lock);
for (item = opal_list_get_first( &work_pool_active);
item != opal_list_get_end( &work_pool_active);
item = opal_list_get_next( item) ) {
wp_item = (orte_filem_rsh_work_pool_item_t *)item;
request = wp_item->request;
index = wp_item->index;
if( request->is_done[index] == false &&
request->exit_status[index] == pid ) {
request->exit_status[index] = status;
/* waitpid() -> done = true, active = false */
request->is_done[index] = true;
request->is_active[index] = false;
OPAL_OUTPUT_VERBOSE((10, mca_filem_rsh_component.super.output_handle,
"filem:rsh: waitpid_cb(): Marked pid %d as complete [status = %d].\n",
pid, status));
break;
}
}
--cur_num_outgoing;
/*
* If we are holding any requests, start them
*/
if( opal_list_get_size(&work_pool_held) > 0 ) {
item = opal_list_remove_first(&work_pool_held);
wp_item = (orte_filem_rsh_work_pool_item_t *)item;
++cur_num_outgoing;
/*
* - put the request on the pending list
* - wait for the peer to tell us that it can receive
*/
opal_list_append(&work_pool_pending, &(wp_item->super));
/*
* Ask for permission to send this file so we do not overwhelm the peer
*/
OPAL_OUTPUT_VERBOSE((10, mca_filem_rsh_component.super.output_handle,
"filem:rsh: start_command(): Ask permission to send from proc %s (*** Activate Held)",
ORTE_NAME_PRINT(&(wp_item->proc_set.source))));
if( ORTE_SUCCESS != (ret = orte_filem_rsh_permission_ask(&(wp_item->proc_set.source), 1)) ) {
opal_output(0, "ERROR: Failed to ask permission!\n");
}
}
/*
* Signal in case anyone is waiting for a child to finish.
*/
opal_condition_signal(&work_pool_cond);
OPAL_THREAD_UNLOCK(&work_pool_lock);
}
/******************************
* Path resolution functions
******************************/
/*
* This function is paired with the filem_base_process_get_remote_path_cmd() function on the remote machine
*/
static int orte_filem_rsh_query_remote_path(char **remote_ref, orte_process_name_t *peer, int *flag) {
int ret;
/*
* If we are given an absolute path for the remote side, then there is
* nothing to do. If the remote directory does not exist, then scp will
* error out, which is caught by the filem_rsh_waitpid_cb() function.
*
* Assume the remote path is a directory, since if it is just a file then
* the command will still work as normal.
*/
if( *remote_ref[0] == '/' ) {
*flag = ORTE_FILEM_TYPE_DIR;
return ORTE_SUCCESS;
}
/* Call the base function */
if( ORTE_SUCCESS != (ret = orte_filem_base_get_remote_path(remote_ref, peer, flag) ) ) {
return ret;
}
return ORTE_SUCCESS;
}
/******************************
* Permission functions
******************************/
static int orte_filem_rsh_permission_listener_init(void)
{
int ret;
if( ORTE_SUCCESS != (ret = orte_rml.recv_buffer_nb(ORTE_NAME_WILDCARD,
ORTE_RML_TAG_FILEM_RSH,
ORTE_RML_PERSISTENT,
orte_filem_rsh_permission_callback,
NULL)) ) {
opal_output(mca_filem_rsh_component.super.output_handle,
"filem:rsh: listener_init: Failed to register the receive callback (%d)",
ret);
return ret;
}
return ORTE_SUCCESS;
}
static int orte_filem_rsh_permission_listener_cancel(void)
{
int ret;
if( ORTE_SUCCESS != (ret = orte_rml.recv_cancel(ORTE_NAME_WILDCARD, ORTE_RML_TAG_FILEM_RSH) ) ) {
#if 0
opal_output(mca_filem_rsh_component.super.output_handle,
"filem:rsh: listener_cancel: Failed to deregister the receive callback (%d)",
ret);
#endif
return ret;
}
return ORTE_SUCCESS;
}
static void orte_filem_rsh_permission_callback(int status,
orte_process_name_t* sender,
opal_buffer_t *buffer,
orte_rml_tag_t tag,
void* cbdata)
{
orte_filem_rsh_work_pool_item_t *wp_item = NULL;
opal_list_item_t *item = NULL;
int ret;
orte_std_cntr_t n;
int num_req, num_allowed = 0;
int perm_flag, i;
int32_t peer_status = 0;
orte_ns_cmp_bitmask_t mask;
OPAL_OUTPUT_VERBOSE((10, mca_filem_rsh_component.super.output_handle,
"filem:rsh: permission_callback(? ?): Peer %s ...",
ORTE_NAME_PRINT(sender)));
/*
* Receive the flag indicating if this is:
* - Asking for permission (ORTE_FILEM_RSH_ASK)
* - Allowing us to send (ORTE_FILEM_RSH_ALLOW)
*/
n = 1;
if (ORTE_SUCCESS != (ret = opal_dss.unpack(buffer, &perm_flag, &n, OPAL_INT))) {
goto cleanup;
}
/* Asking for permission to send */
if( ORTE_FILEM_RSH_ASK == perm_flag ) {
OPAL_OUTPUT_VERBOSE((10, mca_filem_rsh_component.super.output_handle,
"filem:rsh: permission_callback(ASK): Peer %s Asking permission to send [Used %d of %d]",
ORTE_NAME_PRINT(sender),
cur_num_incomming,
orte_filem_rsh_max_incomming));
/*
* Receive the requested amount
*/
n = 1;
if (ORTE_SUCCESS != (ret = opal_dss.unpack(buffer, &num_req, &n, OPAL_INT))) {
goto cleanup;
}
/*
* Determine how many we can allow
* if none then put a request on the waiting list
* ow tell the peer to start sending now.
* Send back number allowed to be started
*/
if( orte_filem_rsh_max_incomming > 0 && orte_filem_rsh_max_incomming < cur_num_incomming + 1) {
/* Add to the waiting list */
OPAL_OUTPUT_VERBOSE((10, mca_filem_rsh_component.super.output_handle,
"filem:rsh: permission_callback(ASK): Add Peer %s request to waiting list",
ORTE_NAME_PRINT(sender)));
wp_item = OBJ_NEW(orte_filem_rsh_work_pool_item_t);
wp_item->proc_set.source.jobid = sender->jobid;
wp_item->proc_set.source.vpid = sender->vpid;
ORTE_EPOCH_SET(wp_item->proc_set.source.epoch,sender->epoch);
opal_list_append(&work_pool_waiting, &(wp_item->super));
}
/* Start the transfer immediately */
else {
/*
* Allow only one file request at a time.
* orte_filem_rsh_start_command() only asks for one anyway.
* JJH: Look into permission for multiple file permissions at a time
*/
num_allowed = 1;
cur_num_incomming += 1;
OPAL_OUTPUT_VERBOSE((10, mca_filem_rsh_component.super.output_handle,
"filem:rsh: permission_callback(ASK): Respond to Peer %s with %d",
ORTE_NAME_PRINT(sender), num_allowed));
permission_send_num_allowed(sender, num_allowed);
}
}
/* Allowing us to start some number of sends */
else if( ORTE_FILEM_RSH_ALLOW == perm_flag ) {
OPAL_OUTPUT_VERBOSE((10, mca_filem_rsh_component.super.output_handle,
"filem:rsh: permission_callback(ALLOW): Peer %s Allowing me to send",
ORTE_NAME_PRINT(sender)));
/*
* Receive the allowed transmit amount
*/
n = 1;
if (ORTE_SUCCESS != (ret = opal_dss.unpack(buffer, &num_allowed, &n, OPAL_INT))) {
goto cleanup;
}
/*
* For each alloacted spot for transmit
* - Get a pending request directed at this peer
* - Start the pending request
*/
for(i = 0; i < num_allowed; ++i ) {
if( 0 >= opal_list_get_size(&work_pool_pending) ) {
OPAL_OUTPUT_VERBOSE((10, mca_filem_rsh_component.super.output_handle,
"filem:rsh: permission_callback(ALLOW): No more pending sends to peer %s...",
ORTE_NAME_PRINT(sender)));
break;
}
for (item = opal_list_get_first( &work_pool_pending);
item != opal_list_get_end( &work_pool_pending);
item = opal_list_get_next( item) ) {
wp_item = (orte_filem_rsh_work_pool_item_t *)item;
mask = ORTE_NS_CMP_ALL;
if (OPAL_EQUAL == orte_util_compare_name_fields(mask, sender, &wp_item->proc_set.source)) {
opal_list_remove_item( &work_pool_pending, item);
break;
}
}
if( item == opal_list_get_end(&work_pool_pending) ) {
OPAL_OUTPUT_VERBOSE((10, mca_filem_rsh_component.super.output_handle,
"filem:rsh: permission_callback(ALLOW): Unable to find message on the pending list\n"));
}
OPAL_OUTPUT_VERBOSE((10, mca_filem_rsh_component.super.output_handle,
"filem:rsh: permission_callback(ALLOW): Starting to send to peer %s... (# pending = %d)",
ORTE_NAME_PRINT(sender), (int)opal_list_get_size(&work_pool_pending)));
wp_item->active = true;
opal_list_append(&work_pool_active, &(wp_item->super));
if( ORTE_SUCCESS != (ret = start_child(wp_item->command,
wp_item->request,
wp_item->index)) ) {
goto cleanup;
}
}
}
/* Peer said they are done sending one or more files */
else if( ORTE_FILEM_RSH_DONE == perm_flag ) {
OPAL_OUTPUT_VERBOSE((10, mca_filem_rsh_component.super.output_handle,
"filem:rsh: permission_callback(DONE): Peer %s is done sending to me",
ORTE_NAME_PRINT(sender)));
/*
* Receive the number of open slots
*/
n = 1;
if (ORTE_SUCCESS != (ret = opal_dss.unpack(buffer, &num_req, &n, OPAL_INT))) {
goto cleanup;
}
cur_num_incomming -= num_req;
/*
* Receive the exit status
*/
n = 1;
if (ORTE_SUCCESS != (ret = opal_dss.unpack(buffer, &peer_status, &n, OPAL_INT32))) {
goto cleanup;
}
if( peer_status != 0 ) {
char * local_target = NULL;
char * remote_target = NULL;
char * remote_cmd = NULL;
n = 1;
if (ORTE_SUCCESS != (ret = opal_dss.unpack(buffer, &local_target, &n, OPAL_STRING))) {
ORTE_ERROR_LOG(ret);
goto cleanup;
}
n = 1;
if (ORTE_SUCCESS != (ret = opal_dss.unpack(buffer, &remote_target, &n, OPAL_STRING))) {
ORTE_ERROR_LOG(ret);
goto cleanup;
}
n = 1;
if (ORTE_SUCCESS != (ret = opal_dss.unpack(buffer, &remote_cmd, &n, OPAL_STRING))) {
ORTE_ERROR_LOG(ret);
goto cleanup;
}
orte_show_help("help-orte-filem-rsh.txt",
"orte-filem-rsh:remote-get-failed",
true, ORTE_NAME_PRINT(sender), peer_status,
local_target,
remote_target,
remote_cmd);
free(local_target);
free(remote_target);
free(remote_cmd);
}
/*
* For each open slot, notify a waiting peer that it may send
*/
for(i = 0; i < num_req; ++i ) {
item = opal_list_get_first( &work_pool_waiting);
if( item != opal_list_get_end( &work_pool_waiting) ) {
wp_item = (orte_filem_rsh_work_pool_item_t *)item;
num_allowed = 1;
cur_num_incomming += 1;
opal_list_remove_item(&work_pool_waiting, item);
permission_send_num_allowed(&(wp_item->proc_set.source), num_allowed);
OBJ_RELEASE(wp_item);
}
}
}
cleanup:
return;
}
static int orte_filem_rsh_permission_ask(orte_process_name_t* source,
int num_sends)
{
int ret, exit_status = ORTE_SUCCESS;
opal_buffer_t loc_buffer;
int perm_flag = ORTE_FILEM_RSH_ASK;
OBJ_CONSTRUCT(&loc_buffer, opal_buffer_t);
if (ORTE_SUCCESS != (ret = opal_dss.pack(&loc_buffer, &perm_flag, 1, OPAL_INT))) {
exit_status = ret;
goto cleanup;
}
if (ORTE_SUCCESS != (ret = opal_dss.pack(&loc_buffer, &num_sends, 1, OPAL_INT))) {
exit_status = ret;
goto cleanup;
}
if (0 > (ret = orte_rml.send_buffer(source, &loc_buffer, ORTE_RML_TAG_FILEM_RSH, 0))) {
exit_status = ret;
goto cleanup;
}
cleanup:
OBJ_DESTRUCT(&loc_buffer);
return exit_status;
}
static int permission_send_done(orte_process_name_t* peer, int num_avail,
int32_t status,
char * local_target,
char * remote_target,
char * command) {
int ret, exit_status = ORTE_SUCCESS;
opal_buffer_t loc_buffer;
int perm_flag = ORTE_FILEM_RSH_DONE;
OBJ_CONSTRUCT(&loc_buffer, opal_buffer_t);
if (ORTE_SUCCESS != (ret = opal_dss.pack(&loc_buffer, &perm_flag, 1, OPAL_INT))) {
exit_status = ret;
goto cleanup;
}
if (ORTE_SUCCESS != (ret = opal_dss.pack(&loc_buffer, &num_avail, 1, OPAL_INT))) {
exit_status = ret;
goto cleanup;
}
if (ORTE_SUCCESS != (ret = opal_dss.pack(&loc_buffer, &status, 1, OPAL_INT32))) {
exit_status = ret;
goto cleanup;
}
if( status != 0 ) {
if (ORTE_SUCCESS != (ret = opal_dss.pack(&loc_buffer, &local_target, 1, OPAL_STRING))) {
exit_status = ret;
goto cleanup;
}
if (ORTE_SUCCESS != (ret = opal_dss.pack(&loc_buffer, &remote_target, 1, OPAL_STRING))) {
exit_status = ret;
goto cleanup;
}
if (ORTE_SUCCESS != (ret = opal_dss.pack(&loc_buffer, &command, 1, OPAL_STRING))) {
exit_status = ret;
goto cleanup;
}
}
if (0 > (ret = orte_rml.send_buffer(peer, &loc_buffer, ORTE_RML_TAG_FILEM_RSH, 0))) {
exit_status = ret;
goto cleanup;
}
cleanup:
OBJ_DESTRUCT(&loc_buffer);
return exit_status;
}
static int permission_send_num_allowed(orte_process_name_t* peer, int num_allowed)
{
int ret, exit_status = ORTE_SUCCESS;
opal_buffer_t loc_buffer;
int perm_flag = ORTE_FILEM_RSH_ALLOW;
OBJ_CONSTRUCT(&loc_buffer, opal_buffer_t);
if (ORTE_SUCCESS != (ret = opal_dss.pack(&loc_buffer, &perm_flag, 1, OPAL_INT))) {
exit_status = ret;
goto cleanup;
}
if (ORTE_SUCCESS != (ret = opal_dss.pack(&loc_buffer, &num_allowed, 1, OPAL_INT))) {
exit_status = ret;
goto cleanup;
}
if (0 > (ret = orte_rml.send_buffer(peer, &loc_buffer, ORTE_RML_TAG_FILEM_RSH, 0))) {
exit_status = ret;
goto cleanup;
}
cleanup:
OBJ_DESTRUCT(&loc_buffer);
return exit_status;
}