1
1
openmpi/orte/mca/filem/rsh/filem_rsh_module.c
Ralph Castain fceabb2498 Update libevent to the 2.0 series, currently at 2.0.7rc. We will update to their final release when it becomes available. Currently known errors exist in unused portions of the libevent code. This revision passes the IBM test suite on a Linux machine and on a standalone Mac.
This is a fairly intrusive change, but outside of the moving of opal/event to opal/mca/event, the only changes involved (a) changing all calls to opal_event functions to reflect the new framework instead, and (b) ensuring that all opal_event_t objects are properly constructed since they are now true opal_objects.

Note: Shiqing has just returned from vacation and has not yet had a chance to complete the Windows integration. Thus, this commit almost certainly breaks Windows support on the trunk. However, I want this to have a chance to soak for as long as possible before I become less available a week from today (going to be at a class for 5 days, and thus will only be sparingly available) so we can find and fix any problems.

Biggest change is moving the libevent code from opal/event to a new opal/mca/event framework. This was done to make it much easier to update libevent in the future. New versions can be inserted as a new component and tested in parallel with the current version until validated, then we can remove the earlier version if we so choose. This is a statically built framework ala installdirs, so only one component will build at a time. There is no selection logic - the sole compiled component simply loads its function pointers into the opal_event struct.

I have gone thru the code base and converted all the libevent calls I could find. However, I cannot compile nor test every environment. It is therefore quite likely that errors remain in the system. Please keep an eye open for two things:

1. compile-time errors: these will be obvious as calls to the old functions (e.g., opal_evtimer_new) must be replaced by the new framework APIs (e.g., opal_event.evtimer_new)

2. run-time errors: these will likely show up as segfaults due to missing constructors on opal_event_t objects. It appears that it became a typical practice for people to "init" an opal_event_t by simply using memset to zero it out. This will no longer work - you must either OBJ_NEW or OBJ_CONSTRUCT an opal_event_t. I tried to catch these cases, but may have missed some. Believe me, you'll know when you hit it.

There is also the issue of the new libevent "no recursion" behavior. As I described on a recent email, we will have to discuss this and figure out what, if anything, we need to do.

This commit was SVN r23925.
2010-10-24 18:35:54 +00:00

1663 строки
58 KiB
C

/*
* Copyright (c) 2004-2010 The Trustees of Indiana University.
* All rights reserved.
* Copyright (c) 2004-2005 The Trustees of the University of Tennessee.
* All rights reserved.
* Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
* University of Stuttgart. All rights reserved.
* Copyright (c) 2004-2005 The Regents of the University of California.
* All rights reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
*
* $HEADER$
*/
/*
*
*/
#include "orte_config.h"
#ifdef HAVE_STRING_H
#include <string.h>
#endif
#include <stdlib.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <sys/wait.h>
#ifdef HAVE_UNISTD_H
#include <unistd.h>
#endif /* HAVE_UNISTD_H */
#include "opal/mca/mca.h"
#include "opal/mca/base/base.h"
#include "opal/mca/base/mca_base_param.h"
#include "opal/mca/event/event.h"
#include "opal/util/opal_sos.h"
#include "orte/constants.h"
#include "orte/util/show_help.h"
#include "opal/util/argv.h"
#include "opal/util/output.h"
#include "opal/util/opal_environ.h"
#include "opal/util/basename.h"
#include "opal/threads/mutex.h"
#include "opal/threads/condition.h"
#include "orte/util/name_fns.h"
#include "orte/util/proc_info.h"
#include "orte/runtime/orte_globals.h"
#include "orte/runtime/orte_wait.h"
#include "orte/mca/errmgr/errmgr.h"
#include "orte/mca/rml/rml_types.h"
#include "orte/mca/filem/filem.h"
#include "orte/mca/filem/base/base.h"
#include "filem_rsh.h"
/**********
* Local Function and Variable Declarations
**********/
static int orte_filem_rsh_start_copy(orte_filem_base_request_t *request);
static int orte_filem_rsh_start_rm(orte_filem_base_request_t *request);
static int orte_filem_rsh_start_command(orte_filem_base_process_set_t *proc_set,
orte_filem_base_file_set_t *file_set,
char * command,
orte_filem_base_request_t *request,
int index);
static int start_child(char * command,
orte_filem_base_request_t *request,
int index);
static int orte_filem_rsh_query_remote_path(char **remote_ref,
orte_process_name_t *proc,
int *flag);
static void filem_rsh_waitpid_cb(pid_t pid, int status, void* cbdata);
/* Permission to send functionality */
static int orte_filem_rsh_permission_listener_init(void);
static int orte_filem_rsh_permission_listener_cancel(void);
static void orte_filem_rsh_permission_callback(int status,
orte_process_name_t* sender,
opal_buffer_t *buffer,
orte_rml_tag_t tag,
void* cbdata);
static int orte_filem_rsh_permission_ask(orte_process_name_t* sender, int num_sends);
static int permission_send_done(orte_process_name_t* sender, int num_avail,
int32_t exit_status,
char * local_target,
char * remote_target,
char * command);
static int permission_send_num_allowed(orte_process_name_t* sender, int num_allowed);
/*************
* Local work pool structure
*************/
int cur_num_incomming = 0;
int cur_num_outgoing = 0;
static bool work_pool_all_done = false;
static opal_mutex_t work_pool_lock;
static opal_condition_t work_pool_cond;
/*
* work_pool_waiting:
* - processes that are waiting for my permission to put() to me
*/
opal_list_t work_pool_waiting;
/*
* work_pool_held:
* - requests that are held before asking permission to reduce load
*/
opal_list_t work_pool_held;
/*
* work_pool_pending:
* - put requests waiting on permission to send to peer
*/
opal_list_t work_pool_pending;
/*
* work_pool_active:
* - put requests currently sending
*/
opal_list_t work_pool_active;
struct orte_filem_rsh_work_pool_item_t {
/** This is an object, so must have a super */
opal_list_item_t super;
/** Command to exec */
char * command;
/** Pointer to FileM Request */
orte_filem_base_request_t *request;
/** Index into the request */
int index;
/** Process Set */
orte_filem_base_process_set_t proc_set;
/** File Set */
orte_filem_base_file_set_t file_set;
/** If this item is active */
bool active;
};
typedef struct orte_filem_rsh_work_pool_item_t orte_filem_rsh_work_pool_item_t;
OBJ_CLASS_DECLARATION(orte_filem_rsh_work_pool_item_t);
void orte_filem_rsh_work_pool_construct(orte_filem_rsh_work_pool_item_t *obj);
void orte_filem_rsh_work_pool_destruct( orte_filem_rsh_work_pool_item_t *obj);
OBJ_CLASS_INSTANCE(orte_filem_rsh_work_pool_item_t,
opal_list_item_t,
orte_filem_rsh_work_pool_construct,
orte_filem_rsh_work_pool_destruct);
void orte_filem_rsh_work_pool_construct(orte_filem_rsh_work_pool_item_t *obj) {
obj->command = NULL;
obj->request = NULL;
obj->index = 0;
OBJ_CONSTRUCT(&(obj->proc_set), orte_filem_base_process_set_t);
OBJ_CONSTRUCT(&(obj->file_set), orte_filem_base_file_set_t);
obj->active = false;
}
void orte_filem_rsh_work_pool_destruct( orte_filem_rsh_work_pool_item_t *obj) {
if( NULL != obj->command ) {
free(obj->command);
obj->command = NULL;
}
if( NULL != obj->request ) {
OBJ_RELEASE(obj->request);
obj->request = NULL;
}
obj->index = 0;
OBJ_DESTRUCT(&(obj->proc_set));
OBJ_DESTRUCT(&(obj->file_set));
obj->active = false;
}
/*
* Rsh module
*/
static orte_filem_base_module_t loc_module = {
/** Initialization Function */
orte_filem_rsh_module_init,
/** Finalization Function */
orte_filem_rsh_module_finalize,
orte_filem_rsh_put,
orte_filem_rsh_put_nb,
orte_filem_rsh_get,
orte_filem_rsh_get_nb,
orte_filem_rsh_rm,
orte_filem_rsh_rm_nb,
orte_filem_rsh_wait,
orte_filem_rsh_wait_all
};
/*
* MCA Functions
*/
int orte_filem_rsh_component_query(mca_base_module_t **module, int *priority)
{
OPAL_OUTPUT_VERBOSE((10, mca_filem_rsh_component.super.output_handle,
"filem:rsh: component_query()"));
*priority = mca_filem_rsh_component.super.priority;
*module = (mca_base_module_t *)&loc_module;
return ORTE_SUCCESS;
}
int orte_filem_rsh_module_init(void)
{
int ret;
orte_filem_base_is_active = false;
OPAL_OUTPUT_VERBOSE((10, mca_filem_rsh_component.super.output_handle,
"filem:rsh: module_init()"));
/*
* Allocate the work pools
*/
OBJ_CONSTRUCT(&work_pool_waiting, opal_list_t);
OBJ_CONSTRUCT(&work_pool_held, opal_list_t);
OBJ_CONSTRUCT(&work_pool_pending, opal_list_t);
OBJ_CONSTRUCT(&work_pool_active, opal_list_t);
OBJ_CONSTRUCT(&work_pool_lock, opal_mutex_t);
OBJ_CONSTRUCT(&work_pool_cond, opal_condition_t);
work_pool_all_done = false;
/*
* Start the listener for permission
*/
if( ORTE_SUCCESS != (ret = orte_filem_rsh_permission_listener_init())) {
opal_output(mca_filem_rsh_component.super.output_handle,
"filem:rsh:init Failed to start listener\n");
return ret;
}
/* start the base receive */
if (ORTE_SUCCESS != (ret = orte_filem_base_comm_start())) {
opal_output(mca_filem_rsh_component.super.output_handle,
"filem:rsh:init Failed to start base receive\n");
return ret;
}
return ORTE_SUCCESS;
}
int orte_filem_rsh_module_finalize(void)
{
opal_list_item_t *item = NULL;
OPAL_OUTPUT_VERBOSE((10, mca_filem_rsh_component.super.output_handle,
"filem:rsh: module_finalize()"));
/*
* Make sure all active requests are completed
*/
if( orte_filem_base_is_active ) {
while(0 < opal_list_get_size(&work_pool_active) ) {
opal_progress();
}
}
orte_filem_base_is_active = false;
/*
* Stop the listeners
*/
orte_filem_rsh_permission_listener_cancel();
/*
* Stop the base receive
*/
orte_filem_base_comm_stop();
/*
* Deallocate the work pools
*/
while( NULL != (item = opal_list_remove_first(&work_pool_waiting)) ) {
OBJ_RELEASE(item);
}
OBJ_DESTRUCT(&work_pool_waiting);
while( NULL != (item = opal_list_remove_first(&work_pool_held)) ) {
OBJ_RELEASE(item);
}
OBJ_DESTRUCT(&work_pool_held);
while( NULL != (item = opal_list_remove_first(&work_pool_pending)) ) {
OBJ_RELEASE(item);
}
OBJ_DESTRUCT(&work_pool_pending);
while( NULL != (item = opal_list_remove_first(&work_pool_active)) ) {
OBJ_RELEASE(item);
}
OBJ_DESTRUCT(&work_pool_active);
OBJ_DESTRUCT(&work_pool_lock);
OBJ_DESTRUCT(&work_pool_cond);
return ORTE_SUCCESS;
}
/******************
* Local functions
******************/
int orte_filem_rsh_put(orte_filem_base_request_t *request)
{
int ret, exit_status = ORTE_SUCCESS;
orte_filem_base_is_active = true;
if( ORTE_SUCCESS != (ret = orte_filem_base_prepare_request(request, ORTE_FILEM_MOVE_TYPE_PUT) ) ) {
opal_output(mca_filem_rsh_component.super.output_handle,
"filem:rsh: put(): Failed to prepare the request structure (%d)", ret);
exit_status = ret;
goto cleanup;
}
if( ORTE_SUCCESS != (ret = orte_filem_rsh_start_copy(request) ) ) {
opal_output(mca_filem_rsh_component.super.output_handle,
"filem:rsh: put(): Failed to post the request (%d)", ret);
exit_status = ret;
goto cleanup;
}
if( ORTE_SUCCESS != (ret = orte_filem_rsh_wait(request)) ) {
opal_output(mca_filem_rsh_component.super.output_handle,
"filem:rsh: put(): Failed to wait on the request (%d)", ret);
exit_status = ret;
goto cleanup;
}
cleanup:
if( 0 < opal_list_get_size(&work_pool_active) ) {
orte_filem_base_is_active = true;
} else {
orte_filem_base_is_active = false;
}
return exit_status;
}
int orte_filem_rsh_put_nb(orte_filem_base_request_t *request)
{
int ret, exit_status = ORTE_SUCCESS;
orte_filem_base_is_active = true;
if( ORTE_SUCCESS != (ret = orte_filem_base_prepare_request(request, ORTE_FILEM_MOVE_TYPE_PUT) ) ) {
opal_output(mca_filem_rsh_component.super.output_handle,
"filem:rsh: put(): Failed to prepare the request structure (%d)", ret);
exit_status = ret;
goto cleanup;
}
if( ORTE_SUCCESS != (ret = orte_filem_rsh_start_copy(request) ) ) {
opal_output(mca_filem_rsh_component.super.output_handle,
"filem:rsh: put(): Failed to post the request (%d)", ret);
exit_status = ret;
goto cleanup;
}
cleanup:
if( 0 < opal_list_get_size(&work_pool_active) ) {
orte_filem_base_is_active = true;
} else {
orte_filem_base_is_active = false;
}
return exit_status;
}
int orte_filem_rsh_get(orte_filem_base_request_t *request)
{
int ret, exit_status = ORTE_SUCCESS;
orte_filem_base_is_active = true;
if( ORTE_SUCCESS != (ret = orte_filem_base_prepare_request(request, ORTE_FILEM_MOVE_TYPE_GET) ) ) {
opal_output(mca_filem_rsh_component.super.output_handle,
"filem:rsh: get(): Failed to prepare the request structure (%d)", ret);
exit_status = ret;
goto cleanup;
}
if( ORTE_SUCCESS != (ret = orte_filem_rsh_start_copy(request) ) ) {
opal_output(mca_filem_rsh_component.super.output_handle,
"filem:rsh: get(): Failed to post the request (%d)", ret);
exit_status = ret;
goto cleanup;
}
if( ORTE_SUCCESS != (ret = orte_filem_rsh_wait(request)) ) {
opal_output(mca_filem_rsh_component.super.output_handle,
"filem:rsh: get(): Failed to wait on the request (%d)", ret);
exit_status = ret;
goto cleanup;
}
cleanup:
if( 0 < opal_list_get_size(&work_pool_active) ) {
orte_filem_base_is_active = true;
} else {
orte_filem_base_is_active = false;
}
return exit_status;
}
int orte_filem_rsh_get_nb(orte_filem_base_request_t *request)
{
int ret, exit_status = ORTE_SUCCESS;
orte_filem_base_is_active = true;
if( ORTE_SUCCESS != (ret = orte_filem_base_prepare_request(request, ORTE_FILEM_MOVE_TYPE_GET) ) ) {
opal_output(mca_filem_rsh_component.super.output_handle,
"filem:rsh: get(): Failed to prepare the request structure (%d)", ret);
exit_status = ret;
goto cleanup;
}
if( ORTE_SUCCESS != (ret = orte_filem_rsh_start_copy(request) ) ) {
opal_output(mca_filem_rsh_component.super.output_handle,
"filem:rsh: get(): Failed to post the request (%d)", ret);
exit_status = ret;
goto cleanup;
}
cleanup:
if( 0 < opal_list_get_size(&work_pool_active) ) {
orte_filem_base_is_active = true;
} else {
orte_filem_base_is_active = false;
}
return exit_status;
}
int orte_filem_rsh_rm(orte_filem_base_request_t *request)
{
int ret = ORTE_SUCCESS, exit_status = ORTE_SUCCESS;
orte_filem_base_is_active = true;
if( ORTE_SUCCESS != (ret = orte_filem_base_prepare_request(request, ORTE_FILEM_MOVE_TYPE_RM) ) ) {
opal_output(mca_filem_rsh_component.super.output_handle,
"filem:rsh: rm(): Failed to prepare on the request (%d)", ret);
exit_status = ret;
goto cleanup;
}
if( ORTE_SUCCESS != (ret = orte_filem_rsh_start_rm(request) ) ) {
opal_output(mca_filem_rsh_component.super.output_handle,
"filem:rsh: rm(): Failed to start the request (%d)", ret);
exit_status = ret;
goto cleanup;
}
if( ORTE_SUCCESS != (ret = orte_filem_rsh_wait(request)) ) {
opal_output(mca_filem_rsh_component.super.output_handle,
"filem:rsh: rm(): Failed to wait on the request (%d)", ret);
exit_status = ret;
goto cleanup;
}
cleanup:
if( 0 < opal_list_get_size(&work_pool_active) ) {
orte_filem_base_is_active = true;
} else {
orte_filem_base_is_active = false;
}
return exit_status;
}
int orte_filem_rsh_rm_nb(orte_filem_base_request_t *request)
{
int ret = ORTE_SUCCESS, exit_status = ORTE_SUCCESS;
orte_filem_base_is_active = true;
if( ORTE_SUCCESS != (ret = orte_filem_base_prepare_request(request, ORTE_FILEM_MOVE_TYPE_RM) ) ) {
opal_output(mca_filem_rsh_component.super.output_handle,
"filem:rsh: rm_nb(): Failed to prepare on the request (%d)", ret);
exit_status = ret;
goto cleanup;
}
if( ORTE_SUCCESS != (ret = orte_filem_rsh_start_rm(request) ) ) {
opal_output(mca_filem_rsh_component.super.output_handle,
"filem:rsh: rm_nb(): Failed to start on the request (%d)", ret);
exit_status = ret;
goto cleanup;
}
cleanup:
if( 0 < opal_list_get_size(&work_pool_active) ) {
orte_filem_base_is_active = true;
} else {
orte_filem_base_is_active = false;
}
return exit_status;
}
int orte_filem_rsh_wait(orte_filem_base_request_t *request)
{
int exit_status = ORTE_SUCCESS;
orte_filem_rsh_work_pool_item_t *wp_item = NULL;
opal_list_item_t *item = NULL;
int i;
int num_finished = 0;
bool found_match = false;
/*
* Stage 0: A pass to see if the request is already done
*/
for(i = 0; i < request->num_mv; ++i) {
if( request->is_done[i] == true &&
request->is_active[i] == true) {
++num_finished;
}
}
while(num_finished < request->num_mv ) {
/*
* Stage 1: Complete all active requests
*/
for(i = 0; i < request->num_mv; ++i) {
/* If the child is still executing (active) then continue
* checking other children
*/
if( request->is_done[i] == false &&
request->is_active[i] == true) {
continue;
}
/* If the child is done executing (!active), but has not been
* collected then do so
*/
else if( request->is_done[i] == true &&
request->is_active[i] == false) {
/*
* Find the reference in the active pool
*/
found_match = false;
for (item = opal_list_get_first( &work_pool_active);
item != opal_list_get_end( &work_pool_active);
item = opal_list_get_next( item) ) {
wp_item = (orte_filem_rsh_work_pool_item_t *)item;
if(request == wp_item->request &&
i == wp_item->index) {
found_match = true;
break;
}
}
/* If no match then assume on the pending list, and continue */
if( !found_match ) {
continue;
}
OPAL_OUTPUT_VERBOSE((10, mca_filem_rsh_component.super.output_handle,
"filem:rsh: wait(): Transfer complete. Cleanup\n"));
opal_list_remove_item(&work_pool_active, item);
/* Mark as fully done [active = true, done = true]
* It does not make complete sense to call this 'active' when
* it is obviously not, but this is a state [true, true] that
* should only be reached if the transfer has finished
* completely.
*/
request->is_done[i] = true;
request->is_active[i] = true;
/* Tell peer we are finished with a send */
permission_send_done(&(wp_item->proc_set.source), 1, request->exit_status[i],
wp_item->file_set.local_target,
wp_item->file_set.remote_target,
wp_item->command);
OBJ_RELEASE(wp_item);
wp_item = NULL;
++num_finished;
}
}
/*
* Wait for a child to complete
*/
if( num_finished < request->num_mv ) {
OPAL_THREAD_LOCK(&work_pool_lock);
opal_condition_wait(&work_pool_cond,
&work_pool_lock);
OPAL_THREAD_UNLOCK(&work_pool_lock);
}
}
/*
* Stage 2: Determine the return value
*/
for(i = 0; i < request->num_mv; ++i) {
if( request->exit_status[i] < 0 ) {
exit_status = request->exit_status[i];
}
}
if( 0 < opal_list_get_size(&work_pool_active) ) {
orte_filem_base_is_active = true;
} else {
orte_filem_base_is_active = false;
}
return exit_status;
}
int orte_filem_rsh_wait_all(opal_list_t * request_list)
{
int ret = ORTE_SUCCESS, exit_status = ORTE_SUCCESS;
opal_list_item_t *item = NULL;
double perc_done, last_reported = 0.0;
int total, done;
total = opal_list_get_size(request_list);
done = 0;
for (item = opal_list_get_first( request_list);
item != opal_list_get_end( request_list);
item = opal_list_get_next( item) ) {
orte_filem_base_request_t *request = (orte_filem_base_request_t *) item;
if( ORTE_SUCCESS != (ret = orte_filem_rsh_wait(request)) ) {
opal_output(mca_filem_rsh_component.super.output_handle,
"filem:rsh: wait_all(): Wait failed (%d)", ret);
exit_status = ret;
goto cleanup;
}
/* Progress Meter */
if( OPAL_UNLIKELY(orte_filem_rsh_progress_meter > 0) ) {
++done;
perc_done = (total - done) / (1.0 * total);
perc_done = (perc_done-1)*(-100.0);
if( perc_done >= (last_reported + orte_filem_rsh_progress_meter) || last_reported == 0.0 ) {
last_reported = perc_done;
opal_output(0, "filem:rsh: progress: %10.2f %c Finished\n",
perc_done, '%');
}
}
}
cleanup:
if( 0 < opal_list_get_size(&work_pool_active) ) {
orte_filem_base_is_active = true;
} else {
orte_filem_base_is_active = false;
}
return exit_status;
}
/**************************
* Support functions
**************************/
static int orte_filem_rsh_start_copy(orte_filem_base_request_t *request) {
int ret = ORTE_SUCCESS, exit_status = ORTE_SUCCESS;
opal_list_item_t *f_item = NULL;
opal_list_item_t *p_item = NULL;
char *remote_machine = NULL;
char *remote_file = NULL;
char *command = NULL;
char *dir_arg = NULL;
int cur_index = 0;
/* For each file pair */
for (f_item = opal_list_get_first( &request->file_sets);
f_item != opal_list_get_end( &request->file_sets);
f_item = opal_list_get_next( f_item) ) {
orte_filem_base_file_set_t * f_set = (orte_filem_base_file_set_t*)f_item;
/* For each process set */
for (p_item = opal_list_get_first( &request->process_sets);
p_item != opal_list_get_end( &request->process_sets);
p_item = opal_list_get_next( p_item) ) {
orte_filem_base_process_set_t * p_set = (orte_filem_base_process_set_t*)p_item;
/*
* If the source and sink are the same, then this is a local operation
* Further if the files are the same, then nothing to do
*/
if (OPAL_EQUAL == orte_util_compare_name_fields(ORTE_NS_CMP_ALL, &p_set->source, &p_set->sink) ) {
if( 0 == strncmp(f_set->local_target, f_set->remote_target, strlen(f_set->remote_target) ) ) {
request->is_done[cur_index] = true;
request->is_active[cur_index] = true;
request->exit_status[cur_index] = 0;
goto continue_set;
}
}
if( request->movement_type == ORTE_FILEM_MOVE_TYPE_PUT ) {
/*
* The file should exist if we are going to put it somewhere else
*/
if( 0 != access(f_set->local_target, R_OK) ) {
OPAL_OUTPUT_VERBOSE((10, mca_filem_rsh_component.super.output_handle,
"filem:rsh: copy(): %s -> %s: Error: Cannot move file %s to %s. Does not exist at source\n",
ORTE_NAME_PRINT(&p_set->source),
ORTE_NAME_PRINT(&p_set->sink),
f_set->local_target,
f_set->remote_target));
orte_show_help("help-orte-filem-rsh.txt",
"orte-filem-rsh:get-file-not-exist",
true, f_set->local_target, orte_process_info.nodename);
request->is_done[cur_index] = true;
request->is_active[cur_index] = true;
request->exit_status[cur_index] = -1;
goto continue_set;
}
}
/* Do not check a local get() operation, to help supress the warnings from the HNP */
else if (OPAL_EQUAL != orte_util_compare_name_fields(ORTE_NS_CMP_ALL, &p_set->source, &p_set->sink) ) {
char *base = NULL;
asprintf(&base, "%s/%s", f_set->local_target, opal_basename(f_set->remote_target));
/*
* The file should not exist if we are getting a file with the
* same name since we do not want to overwrite the filename
* without the users consent.
*/
if( 0 == access(base, R_OK) ) {
OPAL_OUTPUT_VERBOSE((10, mca_filem_rsh_component.super.output_handle,
"filem:rsh: copy(): %s -> %s: Error: Cannot move file %s to %s. Already exists at destination (%s)\n",
ORTE_NAME_PRINT(&p_set->source),
ORTE_NAME_PRINT(&p_set->sink),
f_set->remote_target,
f_set->local_target, base));
orte_show_help("help-orte-filem-rsh.txt",
"orte-filem-rsh:get-file-exists",
true, f_set->local_target, orte_process_info.nodename);
free(base);
base = NULL;
request->is_done[cur_index] = true;
request->is_active[cur_index] = true;
request->exit_status[cur_index] = -1;
goto continue_set;
}
free(base);
base = NULL;
}
if( request->movement_type == ORTE_FILEM_MOVE_TYPE_PUT ) {
OPAL_OUTPUT_VERBOSE((10, mca_filem_rsh_component.super.output_handle,
"filem:rsh: copy(): %s -> %s: Moving file %s %s to %s %s\n",
ORTE_NAME_PRINT(&p_set->source),
ORTE_NAME_PRINT(&p_set->sink),
(f_set->local_hint == ORTE_FILEM_HINT_SHARED ? "(S)" : ""),
f_set->local_target,
(f_set->remote_hint == ORTE_FILEM_HINT_SHARED ? "(S)" : ""),
f_set->remote_target));
} else {
OPAL_OUTPUT_VERBOSE((10, mca_filem_rsh_component.super.output_handle,
"filem:rsh: copy(): %s -> %s: Moving file %s %s to %s %s\n",
ORTE_NAME_PRINT(&p_set->source),
ORTE_NAME_PRINT(&p_set->sink),
(f_set->remote_hint == ORTE_FILEM_HINT_SHARED ? "(S)" : ""),
f_set->remote_target,
(f_set->local_hint == ORTE_FILEM_HINT_SHARED ? "(S)" : ""),
f_set->local_target));
}
/*
* Get the remote machine identifier from the process_name struct
*/
OPAL_OUTPUT_VERBOSE((10, mca_filem_rsh_component.super.output_handle,
"filem:rsh: copy(): %s -> %s: Get node name.\n",
ORTE_NAME_PRINT(&p_set->source),
ORTE_NAME_PRINT(&p_set->sink)));
if( ORTE_SUCCESS != (ret = orte_filem_base_get_proc_node_name(&p_set->source, &remote_machine))) {
opal_output(mca_filem_rsh_component.super.output_handle,
"filem:rsh: copy(): Get Node Name failed (%d)", ret);
exit_status = ret;
goto cleanup;
}
OPAL_OUTPUT_VERBOSE((10, mca_filem_rsh_component.super.output_handle,
"filem:rsh: copy(): %s -> %s: Got node name: %s\n",
ORTE_NAME_PRINT(&p_set->source),
ORTE_NAME_PRINT(&p_set->sink),
remote_machine));
/*
* Fix the remote_filename.
* If it is an absolute path, then assume it is valid for the remote server
* ow then we must construct the correct path.
*/
OPAL_OUTPUT_VERBOSE((10, mca_filem_rsh_component.super.output_handle,
"filem:rsh: copy(): %s -> %s: Query remote path (%s).\n",
ORTE_NAME_PRINT(&p_set->source),
ORTE_NAME_PRINT(&p_set->sink),
f_set->remote_target));
remote_file = strdup(f_set->remote_target);
if( ORTE_SUCCESS != (ret = orte_filem_rsh_query_remote_path(&remote_file, &p_set->source, &f_set->target_flag) ) ) {
opal_output(mca_filem_rsh_component.super.output_handle,
"filem:rsh: copy(): Query Remote Path failed (%d)", ret);
exit_status = ret;
goto cleanup;
}
OPAL_OUTPUT_VERBOSE((10, mca_filem_rsh_component.super.output_handle,
"filem:rsh: copy(): %s -> %s: Remote path (%s) is (%s).\n",
ORTE_NAME_PRINT(&p_set->source),
ORTE_NAME_PRINT(&p_set->sink),
f_set->remote_target,
remote_file));
/*
* Transfer the file or directory
*/
if(ORTE_FILEM_TYPE_DIR == f_set->target_flag) {
dir_arg = strdup(" -r ");
}
else if(ORTE_FILEM_TYPE_UNKNOWN == f_set->target_flag) {
opal_output(mca_filem_rsh_component.super.output_handle,
"filem:rsh: copy(): Error: File type unknown (%s)",
f_set->remote_target);
request->is_done[cur_index] = true;
request->is_active[cur_index] = true;
request->exit_status[cur_index] = -1;
goto continue_set;
}
else {
dir_arg = strdup("");
}
/*
* If this is the put() routine
*/
if( request->movement_type == ORTE_FILEM_MOVE_TYPE_PUT ) {
/* Use a local 'cp' when able */
if(f_set->remote_hint == ORTE_FILEM_HINT_SHARED ) {
asprintf(&command, "%s %s %s %s ",
mca_filem_rsh_component.cp_local_command,
dir_arg,
f_set->local_target,
remote_file);
} else {
asprintf(&command, "%s %s %s %s:%s ",
mca_filem_rsh_component.cp_command,
dir_arg,
f_set->local_target,
remote_machine,
remote_file);
}
}
/*
* ow it is the get() routine
*/
else {
/* Use a local 'cp' when able */
if(f_set->local_hint == ORTE_FILEM_HINT_SHARED ) {
asprintf(&command, "%s %s %s %s %s %s ",
mca_filem_rsh_component.remote_sh_command,
remote_machine,
mca_filem_rsh_component.cp_local_command,
dir_arg,
remote_file,
f_set->local_target);
} else {
asprintf(&command, "%s %s %s:%s %s ",
mca_filem_rsh_component.cp_command,
dir_arg,
remote_machine,
remote_file,
f_set->local_target);
}
}
/*
* Start the command
*/
OPAL_OUTPUT_VERBOSE((17, mca_filem_rsh_component.super.output_handle,
"filem:rsh:%s about to execute [%s]",
(request->movement_type == ORTE_FILEM_MOVE_TYPE_PUT ? "put" : "get"),
command));
if( ORTE_SUCCESS != (ret = orte_filem_rsh_start_command(p_set,
f_set,
command,
request,
cur_index)) ) {
exit_status = ret;
goto cleanup;
}
continue_set:
/* A small bit of cleanup */
if( NULL != dir_arg) {
free(dir_arg);
dir_arg = NULL;
}
if( NULL != remote_file) {
free(remote_file);
remote_file = NULL;
}
if(NULL != remote_machine) {
free(remote_machine);
remote_machine = NULL;
}
++cur_index;
} /* For each process set */
} /* For each file pair */
cleanup:
if( NULL != command )
free(command);
if( NULL != remote_machine)
free(remote_machine);
if( NULL != dir_arg)
free(dir_arg);
if( NULL != remote_file)
free(remote_file);
return exit_status;
}
static int orte_filem_rsh_start_rm(orte_filem_base_request_t *request)
{
int ret = ORTE_SUCCESS, exit_status = ORTE_SUCCESS;
opal_list_item_t *f_item = NULL;
opal_list_item_t *p_item = NULL;
char *command = NULL;
char *remote_machine = NULL;
char *remote_targets = NULL;
char *remote_file = NULL;
char *dir_arg = NULL;
char **remote_file_set = NULL;
int argc = 0;
int cur_index = 0;
/* For each process set */
for (p_item = opal_list_get_first( &request->process_sets);
p_item != opal_list_get_end( &request->process_sets);
p_item = opal_list_get_next( p_item) ) {
orte_filem_base_process_set_t * p_set = (orte_filem_base_process_set_t*)p_item;
/*
* Get the remote machine identifier from the process_name struct
*/
if( ORTE_SUCCESS != (ret = orte_filem_base_get_proc_node_name(&p_set->source, &remote_machine))) {
exit_status = ret;
goto cleanup;
}
/* For each file pair */
for (f_item = opal_list_get_first( &request->file_sets);
f_item != opal_list_get_end( &request->file_sets);
f_item = opal_list_get_next( f_item) ) {
orte_filem_base_file_set_t * f_set = (orte_filem_base_file_set_t*)f_item;
/*
* Fix the remote_filename.
* If it is an absolute path, then assume it is valid for the remote server
* ow then we must construct the correct path.
*/
remote_file = strdup(f_set->remote_target);
if( ORTE_SUCCESS != (ret = orte_filem_rsh_query_remote_path(&remote_file, &p_set->source, &f_set->target_flag) ) ) {
exit_status = ret;
goto cleanup;
}
if(ORTE_FILEM_TYPE_UNKNOWN == f_set->target_flag) {
continue;
}
opal_argv_append(&argc, &remote_file_set, remote_file);
/*
* If we are removing a directory in the mix, then we
* need the recursive argument.
*/
if(NULL == dir_arg) {
if(ORTE_FILEM_TYPE_DIR == f_set->target_flag) {
dir_arg = strdup(" -rf ");
}
}
} /* All File Pairs */
if(NULL == dir_arg) {
dir_arg = strdup(" -f ");
}
remote_targets = opal_argv_join(remote_file_set, ' ');
asprintf(&command, "%s %s rm %s %s ",
mca_filem_rsh_component.remote_sh_command,
remote_machine,
dir_arg,
remote_targets);
OPAL_OUTPUT_VERBOSE((15, mca_filem_rsh_component.super.output_handle,
"filem:rsh:rm about to execute [%s]", command));
if( ORTE_SUCCESS != (ret = orte_filem_rsh_start_command(p_set,
NULL,
command,
request,
cur_index)) ) {
exit_status = ret;
goto cleanup;
}
/* A small bit of cleanup */
if( NULL != dir_arg) {
free(dir_arg);
dir_arg = NULL;
}
if( NULL != remote_targets) {
free(remote_targets);
remote_targets = NULL;
}
if( NULL != remote_file_set) {
opal_argv_free(remote_file_set);
remote_file_set = NULL;
}
if(NULL != remote_machine) {
free(remote_machine);
remote_machine = NULL;
}
++cur_index;
} /* Process set */
cleanup:
if( NULL != command )
free(command);
if( NULL != remote_machine)
free(remote_machine);
if( NULL != dir_arg)
free(dir_arg);
if( NULL != remote_targets)
free(remote_targets);
if( NULL != remote_file_set)
opal_argv_free(remote_file_set);
return exit_status;
}
/******************
* Local Functions
******************/
/******************************
* Work Pool functions
******************************/
static int orte_filem_rsh_start_command(orte_filem_base_process_set_t *proc_set,
orte_filem_base_file_set_t *file_set,
char * command,
orte_filem_base_request_t *request,
int index)
{
orte_filem_rsh_work_pool_item_t *wp_item = NULL;
int ret;
/* Construct a work pool item */
wp_item = OBJ_NEW(orte_filem_rsh_work_pool_item_t);
/* Copy the Process Set */
if( NULL != proc_set ) {
wp_item->proc_set.source.jobid = proc_set->source.jobid;
wp_item->proc_set.source.vpid = proc_set->source.vpid;
wp_item->proc_set.sink.jobid = proc_set->sink.jobid;
wp_item->proc_set.sink.vpid = proc_set->sink.vpid;
}
/* Copy the File Set */
if( NULL != file_set ) {
wp_item->file_set.local_target = strdup(file_set->local_target);
wp_item->file_set.remote_target = strdup(file_set->remote_target);
wp_item->file_set.target_flag = file_set->target_flag;
}
OBJ_RETAIN(request);
wp_item->command = strdup(command);
wp_item->request = request;
wp_item->index = index;
if( orte_filem_rsh_max_outgoing > 0 && cur_num_outgoing >= orte_filem_rsh_max_outgoing ) {
OPAL_OUTPUT_VERBOSE((10, mca_filem_rsh_component.super.output_handle,
"filem:rsh: wait(): *** Hold send from proc %s (%d of %d)",
ORTE_NAME_PRINT(&(wp_item->proc_set.source)), cur_num_outgoing, orte_filem_rsh_max_outgoing));
/*
* - put the request on the held list, since we only allow 2 active filem ops at a time
*/
opal_list_append(&work_pool_held, &(wp_item->super));
}
else {
++cur_num_outgoing;
/*
* - put the request on the pending list
* - wait for the peer to tell us that it can receive
*/
opal_list_append(&work_pool_pending, &(wp_item->super));
/*
* Ask for permission to send this file so we do not overwhelm the peer
* Allow only one file request at a time.
* JJH: Look into permission for multiple file permissions at a time
*/
OPAL_OUTPUT_VERBOSE((10, mca_filem_rsh_component.super.output_handle,
"filem:rsh: start_command(): Ask permission to send from proc %s (%d of %d)",
ORTE_NAME_PRINT(&(proc_set->source)), cur_num_outgoing, orte_filem_rsh_max_outgoing));
if( ORTE_SUCCESS != (ret = orte_filem_rsh_permission_ask(&(proc_set->source), 1)) ) {
return ret;
}
}
return ORTE_SUCCESS;
}
/******************************
* Child process start and wait functions
******************************/
static int start_child(char * command,
orte_filem_base_request_t *request,
int index)
{
char **argv = NULL;
int status, ret;
OPAL_OUTPUT_VERBOSE((10, mca_filem_rsh_component.super.output_handle,
"filem:rsh: start_child(): Starting the command [%s]",
command));
/* fork() -> done = false, active = true */
request->is_done[index] = false;
request->is_active[index] = true;
request->exit_status[index] = fork();
if( request->exit_status[index] == 0 ) { /* Child */
/* Redirect stdout to /dev/null */
freopen( "/dev/null", "w", stdout);
argv = opal_argv_split(command, ' ');
status = execvp(argv[0], argv);
opal_output(0, "filem:rsh:start_child Failed to exec child [%s] status = %d\n", command, status);
exit(ORTE_ERROR);
}
else if( request->exit_status[index] > 0 ) {
OPAL_OUTPUT_VERBOSE((10, mca_filem_rsh_component.super.output_handle,
"filem:rsh: start_child(): Started Child %d Running command [%s]",
request->exit_status[index], command));
/*
* Register a callback for when this process exits
*/
if( ORTE_SUCCESS != (ret = orte_wait_cb(request->exit_status[index], filem_rsh_waitpid_cb, NULL) ) ) {
opal_output(0, "filem:rsh: start_child(): Failed to register a waitpid callback for child [%d] executing the command [%s]\n",
request->exit_status[index], command);
return ret;
}
}
else {
return ORTE_ERROR;
}
return ORTE_SUCCESS;
}
static void filem_rsh_waitpid_cb(pid_t pid, int status, void* cbdata)
{
int ret;
orte_filem_rsh_work_pool_item_t *wp_item = NULL;
orte_filem_base_request_t *request;
opal_list_item_t *item = NULL;
int index;
OPAL_OUTPUT_VERBOSE((10, mca_filem_rsh_component.super.output_handle,
"filem:rsh: waitpid_cb(): Pid %d finished with status [%d].\n",
pid, status));
/*
* Find this pid in the active queue
*/
OPAL_THREAD_LOCK(&work_pool_lock);
for (item = opal_list_get_first( &work_pool_active);
item != opal_list_get_end( &work_pool_active);
item = opal_list_get_next( item) ) {
wp_item = (orte_filem_rsh_work_pool_item_t *)item;
request = wp_item->request;
index = wp_item->index;
if( request->is_done[index] == false &&
request->exit_status[index] == pid ) {
request->exit_status[index] = status;
/* waitpid() -> done = true, active = false */
request->is_done[index] = true;
request->is_active[index] = false;
OPAL_OUTPUT_VERBOSE((10, mca_filem_rsh_component.super.output_handle,
"filem:rsh: waitpid_cb(): Marked pid %d as complete [status = %d].\n",
pid, status));
break;
}
}
--cur_num_outgoing;
/*
* If we are holding any requests, start them
*/
if( opal_list_get_size(&work_pool_held) > 0 ) {
item = opal_list_remove_first(&work_pool_held);
wp_item = (orte_filem_rsh_work_pool_item_t *)item;
++cur_num_outgoing;
/*
* - put the request on the pending list
* - wait for the peer to tell us that it can receive
*/
opal_list_append(&work_pool_pending, &(wp_item->super));
/*
* Ask for permission to send this file so we do not overwhelm the peer
*/
OPAL_OUTPUT_VERBOSE((10, mca_filem_rsh_component.super.output_handle,
"filem:rsh: start_command(): Ask permission to send from proc %s (*** Activate Held)",
ORTE_NAME_PRINT(&(wp_item->proc_set.source))));
if( ORTE_SUCCESS != (ret = orte_filem_rsh_permission_ask(&(wp_item->proc_set.source), 1)) ) {
opal_output(0, "ERROR: Failed to ask permission!\n");
}
}
/*
* Signal in case anyone is waiting for a child to finish.
*/
opal_condition_signal(&work_pool_cond);
OPAL_THREAD_UNLOCK(&work_pool_lock);
}
/******************************
* Path resolution functions
******************************/
/*
* This function is paired with the filem_base_process_get_remote_path_cmd() function on the remote machine
*/
static int orte_filem_rsh_query_remote_path(char **remote_ref, orte_process_name_t *peer, int *flag) {
int ret;
/*
* If we are given an absolute path for the remote side, then there is
* nothing to do. If the remote directory does not exist, then scp will
* error out, which is caught by the filem_rsh_waitpid_cb() function.
*
* Assume the remote path is a directory, since if it is just a file then
* the command will still work as normal.
*/
if( *remote_ref[0] == '/' ) {
*flag = ORTE_FILEM_TYPE_DIR;
return ORTE_SUCCESS;
}
/* Call the base function */
if( ORTE_SUCCESS != (ret = orte_filem_base_get_remote_path(remote_ref, peer, flag) ) ) {
return ret;
}
return ORTE_SUCCESS;
}
/******************************
* Permission functions
******************************/
static int orte_filem_rsh_permission_listener_init(void)
{
int ret;
if( ORTE_SUCCESS != (ret = orte_rml.recv_buffer_nb(ORTE_NAME_WILDCARD,
ORTE_RML_TAG_FILEM_RSH,
ORTE_RML_PERSISTENT,
orte_filem_rsh_permission_callback,
NULL)) ) {
opal_output(mca_filem_rsh_component.super.output_handle,
"filem:rsh: listener_init: Failed to register the receive callback (%d)",
ret);
return ret;
}
return ORTE_SUCCESS;
}
static int orte_filem_rsh_permission_listener_cancel(void)
{
int ret;
if( ORTE_SUCCESS != (ret = orte_rml.recv_cancel(ORTE_NAME_WILDCARD, ORTE_RML_TAG_FILEM_RSH) ) ) {
#if 0
opal_output(mca_filem_rsh_component.super.output_handle,
"filem:rsh: listener_cancel: Failed to deregister the receive callback (%d)",
ret);
#endif
return ret;
}
return ORTE_SUCCESS;
}
static void orte_filem_rsh_permission_callback(int status,
orte_process_name_t* sender,
opal_buffer_t *buffer,
orte_rml_tag_t tag,
void* cbdata)
{
orte_filem_rsh_work_pool_item_t *wp_item = NULL;
opal_list_item_t *item = NULL;
int ret;
orte_std_cntr_t n;
int num_req, num_allowed = 0;
int perm_flag, i;
int32_t peer_status = 0;
OPAL_OUTPUT_VERBOSE((10, mca_filem_rsh_component.super.output_handle,
"filem:rsh: permission_callback(? ?): Peer %s ...",
ORTE_NAME_PRINT(sender)));
/*
* Receive the flag indicating if this is:
* - Asking for permission (ORTE_FILEM_RSH_ASK)
* - Allowing us to send (ORTE_FILEM_RSH_ALLOW)
*/
n = 1;
if (ORTE_SUCCESS != (ret = opal_dss.unpack(buffer, &perm_flag, &n, OPAL_INT))) {
goto cleanup;
}
/* Asking for permission to send */
if( ORTE_FILEM_RSH_ASK == perm_flag ) {
OPAL_OUTPUT_VERBOSE((10, mca_filem_rsh_component.super.output_handle,
"filem:rsh: permission_callback(ASK): Peer %s Asking permission to send [Used %d of %d]",
ORTE_NAME_PRINT(sender),
cur_num_incomming,
orte_filem_rsh_max_incomming));
/*
* Receive the requested amount
*/
n = 1;
if (ORTE_SUCCESS != (ret = opal_dss.unpack(buffer, &num_req, &n, OPAL_INT))) {
goto cleanup;
}
/*
* Determine how many we can allow
* if none then put a request on the waiting list
* ow tell the peer to start sending now.
* Send back number allowed to be started
*/
if( orte_filem_rsh_max_incomming > 0 && orte_filem_rsh_max_incomming < cur_num_incomming + 1) {
/* Add to the waiting list */
OPAL_OUTPUT_VERBOSE((10, mca_filem_rsh_component.super.output_handle,
"filem:rsh: permission_callback(ASK): Add Peer %s request to waiting list",
ORTE_NAME_PRINT(sender)));
wp_item = OBJ_NEW(orte_filem_rsh_work_pool_item_t);
wp_item->proc_set.source.jobid = sender->jobid;
wp_item->proc_set.source.vpid = sender->vpid;
opal_list_append(&work_pool_waiting, &(wp_item->super));
}
/* Start the transfer immediately */
else {
/*
* Allow only one file request at a time.
* orte_filem_rsh_start_command() only asks for one anyway.
* JJH: Look into permission for multiple file permissions at a time
*/
num_allowed = 1;
cur_num_incomming += 1;
OPAL_OUTPUT_VERBOSE((10, mca_filem_rsh_component.super.output_handle,
"filem:rsh: permission_callback(ASK): Respond to Peer %s with %d",
ORTE_NAME_PRINT(sender), num_allowed));
permission_send_num_allowed(sender, num_allowed);
}
}
/* Allowing us to start some number of sends */
else if( ORTE_FILEM_RSH_ALLOW == perm_flag ) {
OPAL_OUTPUT_VERBOSE((10, mca_filem_rsh_component.super.output_handle,
"filem:rsh: permission_callback(ALLOW): Peer %s Allowing me to send",
ORTE_NAME_PRINT(sender)));
/*
* Receive the allowed transmit amount
*/
n = 1;
if (ORTE_SUCCESS != (ret = opal_dss.unpack(buffer, &num_allowed, &n, OPAL_INT))) {
goto cleanup;
}
/*
* For each alloacted spot for transmit
* - Get a pending request directed at this peer
* - Start the pending request
*/
for(i = 0; i < num_allowed; ++i ) {
if( 0 >= opal_list_get_size(&work_pool_pending) ) {
OPAL_OUTPUT_VERBOSE((10, mca_filem_rsh_component.super.output_handle,
"filem:rsh: permission_callback(ALLOW): No more pending sends to peer %s...",
ORTE_NAME_PRINT(sender)));
break;
}
for (item = opal_list_get_first( &work_pool_pending);
item != opal_list_get_end( &work_pool_pending);
item = opal_list_get_next( item) ) {
wp_item = (orte_filem_rsh_work_pool_item_t *)item;
if(sender->jobid == wp_item->proc_set.source.jobid &&
sender->vpid == wp_item->proc_set.source.vpid ) {
opal_list_remove_item( &work_pool_pending, item);
break;
}
}
if( item == opal_list_get_end(&work_pool_pending) ) {
OPAL_OUTPUT_VERBOSE((10, mca_filem_rsh_component.super.output_handle,
"filem:rsh: permission_callback(ALLOW): Unable to find message on the pending list\n"));
}
OPAL_OUTPUT_VERBOSE((10, mca_filem_rsh_component.super.output_handle,
"filem:rsh: permission_callback(ALLOW): Starting to send to peer %s... (# pending = %d)",
ORTE_NAME_PRINT(sender), (int)opal_list_get_size(&work_pool_pending)));
wp_item->active = true;
opal_list_append(&work_pool_active, &(wp_item->super));
if( ORTE_SUCCESS != (ret = start_child(wp_item->command,
wp_item->request,
wp_item->index)) ) {
goto cleanup;
}
}
}
/* Peer said they are done sending one or more files */
else if( ORTE_FILEM_RSH_DONE == perm_flag ) {
OPAL_OUTPUT_VERBOSE((10, mca_filem_rsh_component.super.output_handle,
"filem:rsh: permission_callback(DONE): Peer %s is done sending to me",
ORTE_NAME_PRINT(sender)));
/*
* Receive the number of open slots
*/
n = 1;
if (ORTE_SUCCESS != (ret = opal_dss.unpack(buffer, &num_req, &n, OPAL_INT))) {
goto cleanup;
}
cur_num_incomming -= num_req;
/*
* Receive the exit status
*/
n = 1;
if (ORTE_SUCCESS != (ret = opal_dss.unpack(buffer, &peer_status, &n, OPAL_INT32))) {
goto cleanup;
}
if( peer_status != 0 ) {
char * local_target = NULL;
char * remote_target = NULL;
char * remote_cmd = NULL;
n = 1;
if (ORTE_SUCCESS != (ret = opal_dss.unpack(buffer, &local_target, &n, OPAL_STRING))) {
ORTE_ERROR_LOG(ret);
goto cleanup;
}
n = 1;
if (ORTE_SUCCESS != (ret = opal_dss.unpack(buffer, &remote_target, &n, OPAL_STRING))) {
ORTE_ERROR_LOG(ret);
goto cleanup;
}
n = 1;
if (ORTE_SUCCESS != (ret = opal_dss.unpack(buffer, &remote_cmd, &n, OPAL_STRING))) {
ORTE_ERROR_LOG(ret);
goto cleanup;
}
orte_show_help("help-orte-filem-rsh.txt",
"orte-filem-rsh:remote-get-failed",
true, ORTE_NAME_PRINT(sender), peer_status,
local_target,
remote_target,
remote_cmd);
free(local_target);
free(remote_target);
free(remote_cmd);
}
/*
* For each open slot, notify a waiting peer that it may send
*/
for(i = 0; i < num_req; ++i ) {
item = opal_list_get_first( &work_pool_waiting);
if( item != opal_list_get_end( &work_pool_waiting) ) {
wp_item = (orte_filem_rsh_work_pool_item_t *)item;
num_allowed = 1;
cur_num_incomming += 1;
opal_list_remove_item(&work_pool_waiting, item);
permission_send_num_allowed(&(wp_item->proc_set.source), num_allowed);
OBJ_RELEASE(wp_item);
}
}
}
cleanup:
return;
}
static int orte_filem_rsh_permission_ask(orte_process_name_t* source,
int num_sends)
{
int ret, exit_status = ORTE_SUCCESS;
opal_buffer_t loc_buffer;
int perm_flag = ORTE_FILEM_RSH_ASK;
OBJ_CONSTRUCT(&loc_buffer, opal_buffer_t);
if (ORTE_SUCCESS != (ret = opal_dss.pack(&loc_buffer, &perm_flag, 1, OPAL_INT))) {
exit_status = ret;
goto cleanup;
}
if (ORTE_SUCCESS != (ret = opal_dss.pack(&loc_buffer, &num_sends, 1, OPAL_INT))) {
exit_status = ret;
goto cleanup;
}
if (0 > (ret = orte_rml.send_buffer(source, &loc_buffer, ORTE_RML_TAG_FILEM_RSH, 0))) {
exit_status = ret;
goto cleanup;
}
cleanup:
OBJ_DESTRUCT(&loc_buffer);
return exit_status;
}
static int permission_send_done(orte_process_name_t* peer, int num_avail,
int32_t status,
char * local_target,
char * remote_target,
char * command) {
int ret, exit_status = ORTE_SUCCESS;
opal_buffer_t loc_buffer;
int perm_flag = ORTE_FILEM_RSH_DONE;
OBJ_CONSTRUCT(&loc_buffer, opal_buffer_t);
if (ORTE_SUCCESS != (ret = opal_dss.pack(&loc_buffer, &perm_flag, 1, OPAL_INT))) {
exit_status = ret;
goto cleanup;
}
if (ORTE_SUCCESS != (ret = opal_dss.pack(&loc_buffer, &num_avail, 1, OPAL_INT))) {
exit_status = ret;
goto cleanup;
}
if (ORTE_SUCCESS != (ret = opal_dss.pack(&loc_buffer, &status, 1, OPAL_INT32))) {
exit_status = ret;
goto cleanup;
}
if( status != 0 ) {
if (ORTE_SUCCESS != (ret = opal_dss.pack(&loc_buffer, &local_target, 1, OPAL_STRING))) {
exit_status = ret;
goto cleanup;
}
if (ORTE_SUCCESS != (ret = opal_dss.pack(&loc_buffer, &remote_target, 1, OPAL_STRING))) {
exit_status = ret;
goto cleanup;
}
if (ORTE_SUCCESS != (ret = opal_dss.pack(&loc_buffer, &command, 1, OPAL_STRING))) {
exit_status = ret;
goto cleanup;
}
}
if (0 > (ret = orte_rml.send_buffer(peer, &loc_buffer, ORTE_RML_TAG_FILEM_RSH, 0))) {
exit_status = ret;
goto cleanup;
}
cleanup:
OBJ_DESTRUCT(&loc_buffer);
return exit_status;
}
static int permission_send_num_allowed(orte_process_name_t* peer, int num_allowed)
{
int ret, exit_status = ORTE_SUCCESS;
opal_buffer_t loc_buffer;
int perm_flag = ORTE_FILEM_RSH_ALLOW;
OBJ_CONSTRUCT(&loc_buffer, opal_buffer_t);
if (ORTE_SUCCESS != (ret = opal_dss.pack(&loc_buffer, &perm_flag, 1, OPAL_INT))) {
exit_status = ret;
goto cleanup;
}
if (ORTE_SUCCESS != (ret = opal_dss.pack(&loc_buffer, &num_allowed, 1, OPAL_INT))) {
exit_status = ret;
goto cleanup;
}
if (0 > (ret = orte_rml.send_buffer(peer, &loc_buffer, ORTE_RML_TAG_FILEM_RSH, 0))) {
exit_status = ret;
goto cleanup;
}
cleanup:
OBJ_DESTRUCT(&loc_buffer);
return exit_status;
}