1
1

Expand the dfs API to support retrieval, loading and purging of file maps.

This commit was SVN r27515.
This commit is contained in:
Ralph Castain 2012-10-29 23:05:45 +00:00
parent c0f1775620
commit e5e72c3137
8 changed files with 891 additions and 50 deletions

View File

@ -60,6 +60,19 @@ static void dfs_read(int fd, uint8_t *buffer,
long length,
orte_dfs_read_callback_fn_t cbfunc,
void *cbdata);
static void dfs_post_file_map(opal_byte_object_t *bo,
orte_dfs_post_callback_fn_t cbfunc,
void *cbdata);
static void dfs_get_file_map(orte_process_name_t *target,
orte_dfs_fm_callback_fn_t cbfunc,
void *cbdata);
static void dfs_load_file_maps(orte_jobid_t jobid,
opal_byte_object_t *bo,
orte_dfs_load_callback_fn_t cbfunc,
void *cbdata);
static void dfs_purge_file_maps(orte_jobid_t jobid,
orte_dfs_purge_callback_fn_t cbfunc,
void *cbdata);
/******************
* APP module
@ -71,7 +84,11 @@ orte_dfs_base_module_t orte_dfs_app_module = {
dfs_close,
dfs_get_file_size,
dfs_seek,
dfs_read
dfs_read,
dfs_post_file_map,
dfs_get_file_map,
dfs_load_file_maps,
dfs_purge_file_maps
};
static opal_list_t requests, active_files;
@ -128,6 +145,7 @@ static void recv_dfs(int status, orte_process_name_t* sender,
int64_t i64;
uint64_t rid;
orte_dfs_tracker_t *trk;
opal_byte_object_t *bo;
/* unpack the command this message is responding to */
cnt = 1;
@ -352,6 +370,79 @@ static void recv_dfs(int status, orte_process_name_t* sender,
OBJ_RELEASE(dfs);
break;
case ORTE_DFS_POST_CMD:
/* unpack the request id for this read */
cnt = 1;
if (OPAL_SUCCESS != (rc = opal_dss.unpack(buffer, &rid, &cnt, OPAL_UINT64))) {
ORTE_ERROR_LOG(rc);
return;
}
/* search our list of requests to find the matching one */
dfs = NULL;
for (item = opal_list_get_first(&requests);
item != opal_list_get_end(&requests);
item = opal_list_get_next(item)) {
dptr = (orte_dfs_request_t*)item;
if (dptr->id == rid) {
/* request was fulfilled, so remove it */
opal_list_remove_item(&requests, item);
dfs = dptr;
break;
}
}
if (NULL == dfs) {
opal_output_verbose(1, orte_dfs_base.output,
"%s recvd post - no corresponding request found",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME));
ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND);
return;
}
if (NULL != dfs->post_cbfunc) {
dfs->post_cbfunc(dfs->cbdata);
}
OBJ_RELEASE(dfs);
break;
case ORTE_DFS_GETFM_CMD:
/* unpack the request id for this read */
cnt = 1;
if (OPAL_SUCCESS != (rc = opal_dss.unpack(buffer, &rid, &cnt, OPAL_UINT64))) {
ORTE_ERROR_LOG(rc);
return;
}
/* search our list of requests to find the matching one */
dfs = NULL;
for (item = opal_list_get_first(&requests);
item != opal_list_get_end(&requests);
item = opal_list_get_next(item)) {
dptr = (orte_dfs_request_t*)item;
if (dptr->id == rid) {
/* request was fulfilled, so remove it */
opal_list_remove_item(&requests, item);
dfs = dptr;
break;
}
}
if (NULL == dfs) {
opal_output_verbose(1, orte_dfs_base.output,
"%s recvd getfm - no corresponding request found",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME));
ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND);
return;
}
/* unpack the byte object */
cnt = 1;
if (OPAL_SUCCESS != (rc = opal_dss.unpack(buffer, &bo, &cnt, OPAL_BYTE_OBJECT))) {
ORTE_ERROR_LOG(rc);
return;
}
/* return it to caller */
if (NULL != dfs->fm_cbfunc) {
dfs->fm_cbfunc(bo, dfs->cbdata);
}
OBJ_RELEASE(dfs);
break;
default:
opal_output(0, "APP:DFS:RECV WTF");
break;
@ -382,7 +473,6 @@ static void open_local_file(orte_dfs_request_t *dfs)
if (NULL != dfs->open_cbfunc) {
dfs->open_cbfunc(dfs->remote_fd, dfs->cbdata);
}
OBJ_RELEASE(dfs);
return;
}
/* otherwise, create a tracker for this file */
@ -1046,3 +1136,152 @@ static void dfs_read(int fd, uint8_t *buffer,
/* post it for processing */
ORTE_DFS_POST_REQUEST(dfs, process_reads);
}
static void process_posts(int fd, short args, void *cbdata)
{
orte_dfs_request_t *dfs = (orte_dfs_request_t*)cbdata;
opal_buffer_t *buffer;
int rc;
/* we will get confirmation in our receive function, so
* add this request to our list */
dfs->id = req_id++;
opal_list_append(&requests, &dfs->super);
/* Send the byte object to our local daemon for storage */
buffer = OBJ_NEW(opal_buffer_t);
if (OPAL_SUCCESS != (rc = opal_dss.pack(buffer, &dfs->cmd, 1, ORTE_DFS_CMD_T))) {
ORTE_ERROR_LOG(rc);
goto error;
}
/* include the request id */
if (OPAL_SUCCESS != (rc = opal_dss.pack(buffer, &dfs->id, 1, OPAL_UINT64))) {
ORTE_ERROR_LOG(rc);
goto error;
}
/* add my name */
if (OPAL_SUCCESS != (rc = opal_dss.pack(buffer, ORTE_PROC_MY_NAME, 1, ORTE_NAME))) {
ORTE_ERROR_LOG(rc);
goto error;
}
if (OPAL_SUCCESS != (rc = opal_dss.pack(buffer, &dfs->bo, 1, OPAL_BYTE_OBJECT))) {
ORTE_ERROR_LOG(rc);
goto error;
}
/* send it */
if (0 > (rc = orte_rml.send_buffer_nb(ORTE_PROC_MY_DAEMON, buffer,
ORTE_RML_TAG_DFS_CMD, 0,
orte_rml_send_callback, NULL))) {
ORTE_ERROR_LOG(rc);
goto error;
}
return;
error:
OBJ_RELEASE(buffer);
opal_list_remove_item(&requests, &dfs->super);
if (NULL != dfs->post_cbfunc) {
dfs->post_cbfunc(dfs->cbdata);
}
OBJ_RELEASE(dfs);
}
static void dfs_post_file_map(opal_byte_object_t *bo,
orte_dfs_post_callback_fn_t cbfunc,
void *cbdata)
{
orte_dfs_request_t *dfs;
dfs = OBJ_NEW(orte_dfs_request_t);
dfs->cmd = ORTE_DFS_POST_CMD;
dfs->bo = bo;
dfs->post_cbfunc = cbfunc;
dfs->cbdata = cbdata;
/* post it for processing */
ORTE_DFS_POST_REQUEST(dfs, process_posts);
}
static void process_getfm(int fd, short args, void *cbdata)
{
orte_dfs_request_t *dfs = (orte_dfs_request_t*)cbdata;
opal_buffer_t *buffer;
int rc;
/* we will get confirmation in our receive function, so
* add this request to our list */
dfs->id = req_id++;
opal_list_append(&requests, &dfs->super);
/* Send the request to our local daemon */
buffer = OBJ_NEW(opal_buffer_t);
if (OPAL_SUCCESS != (rc = opal_dss.pack(buffer, &dfs->cmd, 1, ORTE_DFS_CMD_T))) {
ORTE_ERROR_LOG(rc);
goto error;
}
/* include the request id */
if (OPAL_SUCCESS != (rc = opal_dss.pack(buffer, &dfs->id, 1, OPAL_UINT64))) {
ORTE_ERROR_LOG(rc);
goto error;
}
/* and the target */
if (OPAL_SUCCESS != (rc = opal_dss.pack(buffer, &dfs->target, 1, ORTE_NAME))) {
ORTE_ERROR_LOG(rc);
goto error;
}
/* send it */
if (0 > (rc = orte_rml.send_buffer_nb(ORTE_PROC_MY_DAEMON, buffer,
ORTE_RML_TAG_DFS_CMD, 0,
orte_rml_send_callback, NULL))) {
ORTE_ERROR_LOG(rc);
goto error;
}
return;
error:
OBJ_RELEASE(buffer);
opal_list_remove_item(&requests, &dfs->super);
if (NULL != dfs->fm_cbfunc) {
dfs->fm_cbfunc(NULL, dfs->cbdata);
}
OBJ_RELEASE(dfs);
}
static void dfs_get_file_map(orte_process_name_t *target,
orte_dfs_fm_callback_fn_t cbfunc,
void *cbdata)
{
orte_dfs_request_t *dfs;
dfs = OBJ_NEW(orte_dfs_request_t);
dfs->cmd = ORTE_DFS_GETFM_CMD;
dfs->target.jobid = target->jobid;
dfs->target.vpid = target->vpid;
dfs->fm_cbfunc = cbfunc;
dfs->cbdata = cbdata;
/* post it for processing */
ORTE_DFS_POST_REQUEST(dfs, process_getfm);
}
static void dfs_load_file_maps(orte_jobid_t jobid,
opal_byte_object_t *bo,
orte_dfs_load_callback_fn_t cbfunc,
void *cbdata)
{
/* apps don't store file maps */
if (NULL != cbfunc) {
cbfunc(cbdata);
}
}
static void dfs_purge_file_maps(orte_jobid_t jobid,
orte_dfs_purge_callback_fn_t cbfunc,
void *cbdata)
{
/* apps don't store file maps */
if (NULL != cbfunc) {
cbfunc(cbdata);
}
}

View File

@ -62,16 +62,22 @@ typedef struct {
opal_event_t ev;
uint64_t id;
orte_dfs_cmd_t cmd;
orte_process_name_t target;
char *uri;
int local_fd;
int remote_fd;
uint8_t *read_buffer;
long read_length;
opal_byte_object_t *bo;
orte_dfs_open_callback_fn_t open_cbfunc;
orte_dfs_close_callback_fn_t close_cbfunc;
orte_dfs_size_callback_fn_t size_cbfunc;
orte_dfs_seek_callback_fn_t seek_cbfunc;
orte_dfs_read_callback_fn_t read_cbfunc;
orte_dfs_post_callback_fn_t post_cbfunc;
orte_dfs_fm_callback_fn_t fm_cbfunc;
orte_dfs_load_callback_fn_t load_cbfunc;
orte_dfs_purge_callback_fn_t purge_cbfunc;
void *cbdata;
} orte_dfs_request_t;
OBJ_CLASS_DECLARATION(orte_dfs_request_t);
@ -80,7 +86,7 @@ OBJ_CLASS_DECLARATION(orte_dfs_request_t);
do { \
opal_event_set(orte_event_base, &((d)->ev), \
-1, OPAL_EV_WRITE, (cb), (d)); \
opal_event_set_priority(&((d)->ev), ORTE_MSG_PRI); \
opal_event_set_priority(&((d)->ev), ORTE_SYS_PRI); \
opal_event_active(&((d)->ev), OPAL_EV_WRITE, 1); \
} while(0);

View File

@ -22,12 +22,12 @@
int orte_dfs_base_close(void)
{
/* if not initialized, then skip this action. */
if( !orte_dfs_base.initialized ) {
if (!orte_dfs_base.initialized) {
return ORTE_SUCCESS;
}
/* Close selected component */
if( NULL != orte_dfs.finalize ) {
if (NULL != orte_dfs.finalize) {
orte_dfs.finalize();
}

View File

@ -42,7 +42,19 @@ opal_list_t orte_dfs_base_components_available;
orte_dfs_base_t orte_dfs_base;
orte_dfs_base_module_t orte_dfs;
orte_dfs_base_module_t orte_dfs = {
NULL,
NULL,
NULL,
NULL,
NULL,
NULL,
NULL,
NULL,
NULL,
NULL,
NULL
};
/**
* Function for finding and opening either all MCA components, or the one
@ -105,6 +117,10 @@ static void req_const(orte_dfs_request_t *dfs)
dfs->size_cbfunc = NULL;
dfs->seek_cbfunc = NULL;
dfs->read_cbfunc = NULL;
dfs->post_cbfunc = NULL;
dfs->fm_cbfunc = NULL;
dfs->load_cbfunc = NULL;
dfs->purge_cbfunc = NULL;
dfs->cbdata = NULL;
}
static void req_dest(orte_dfs_request_t *dfs)
@ -116,3 +132,36 @@ static void req_dest(orte_dfs_request_t *dfs)
OBJ_CLASS_INSTANCE(orte_dfs_request_t,
opal_list_item_t,
req_const, req_dest);
static void jobfm_const(orte_dfs_jobfm_t *fm)
{
OBJ_CONSTRUCT(&fm->maps, opal_list_t);
}
static void jobfm_dest(orte_dfs_jobfm_t *fm)
{
opal_list_item_t *item;
while (NULL != (item = opal_list_remove_first(&fm->maps))) {
OBJ_RELEASE(item);
}
OBJ_DESTRUCT(&fm->maps);
}
OBJ_CLASS_INSTANCE(orte_dfs_jobfm_t,
opal_list_item_t,
jobfm_const, jobfm_dest);
static void vpidfm_const(orte_dfs_vpidfm_t *fm)
{
fm->fm.bytes = NULL;
fm->fm.size = 0;
fm->num_entries = 0;
}
static void vpidfm_dest(orte_dfs_vpidfm_t *fm)
{
if (NULL != fm->fm.bytes) {
free(fm->fm.bytes);
}
}
OBJ_CLASS_INSTANCE(orte_dfs_vpidfm_t,
opal_list_item_t,
vpidfm_const, vpidfm_dest);

View File

@ -24,29 +24,30 @@
int orte_dfs_base_select(void)
{
int exit_status = OPAL_SUCCESS;
int exit_status = ORTE_SUCCESS;
orte_dfs_base_component_t *best_component = NULL;
orte_dfs_base_module_t *best_module = NULL;
/*
* Select the best component
*/
if( OPAL_SUCCESS != mca_base_select("dfs", orte_dfs_base.output,
if (OPAL_SUCCESS != mca_base_select("dfs", orte_dfs_base.output,
&orte_dfs_base.components_available,
(mca_base_module_t **) &best_module,
(mca_base_component_t **) &best_component) ) {
/* This will only happen if no component was selected */
exit_status = ORTE_ERROR;
goto cleanup;
(mca_base_component_t **) &best_component)) {
/* This will only happen if no component was selected, which
* is okay - we don't have to select anything
*/
return ORTE_SUCCESS;
}
/* Save the winner */
orte_dfs = *best_module;
/* Initialize the winner */
if (NULL != best_module) {
if (OPAL_SUCCESS != orte_dfs.init()) {
exit_status = OPAL_ERROR;
if (NULL != best_module && NULL != orte_dfs.init) {
if (ORTE_SUCCESS != orte_dfs.init()) {
exit_status = ORTE_ERROR;
goto cleanup;
}
}

View File

@ -12,6 +12,7 @@
#define ORTE_MCA_DFS_H
#include "orte_config.h"
#include "orte/types.h"
#ifdef HAVE_FCNTL_H
#include <fcntl.h>
@ -106,20 +107,54 @@ typedef void (*orte_dfs_base_module_read_fn_t)(int fd, uint8_t *buffer,
void *cbdata);
/* Post a file map so others may access it */
typedef void (*orte_dfs_base_module_post_file_map_fn_t)(opal_byte_object_t *bo,
orte_dfs_post_callback_fn_t cbfunc,
void *cbdata);
/* Get the file map for a process
*
* Returns the file map associated with the specified process name. If
* NULL is provided, then all known process maps will be returned in the
* byte object. It is the responsibility of the caller to unpack it, so
* applications are free to specify whatever constitutes a "file map" that
* suits their purposes
*/
typedef void (*orte_dfs_base_module_get_file_map_fn_t)(orte_process_name_t *target,
orte_dfs_fm_callback_fn_t cbfunc,
void *cbdata);
/* Load file maps for a job
*/
typedef void (*orte_dfs_base_module_load_file_maps_fn_t)(orte_jobid_t jobid,
opal_byte_object_t *bo,
orte_dfs_load_callback_fn_t cbfunc,
void *cbdata);
/* Purge file maps for a job */
typedef void (*orte_dfs_base_module_purge_file_maps_fn_t)(orte_jobid_t jobid,
orte_dfs_purge_callback_fn_t cbfunc,
void *cbdata);
/*
* Module Structure
*/
struct orte_dfs_base_module_1_0_0_t {
/** Initialization Function */
orte_dfs_base_module_init_fn_t init;
orte_dfs_base_module_init_fn_t init;
/** Finalization Function */
orte_dfs_base_module_finalize_fn_t finalize;
orte_dfs_base_module_finalize_fn_t finalize;
orte_dfs_base_module_open_fn_t open;
orte_dfs_base_module_close_fn_t close;
orte_dfs_base_module_get_file_size_fn_t get_file_size;
orte_dfs_base_module_seek_fn_t seek;
orte_dfs_base_module_read_fn_t read;
orte_dfs_base_module_open_fn_t open;
orte_dfs_base_module_close_fn_t close;
orte_dfs_base_module_get_file_size_fn_t get_file_size;
orte_dfs_base_module_seek_fn_t seek;
orte_dfs_base_module_read_fn_t read;
orte_dfs_base_module_post_file_map_fn_t post_file_map;
orte_dfs_base_module_get_file_map_fn_t get_file_map;
orte_dfs_base_module_load_file_maps_fn_t load_file_maps;
orte_dfs_base_module_purge_file_maps_fn_t purge_file_maps;
};
typedef struct orte_dfs_base_module_1_0_0_t orte_dfs_base_module_1_0_0_t;
typedef orte_dfs_base_module_1_0_0_t orte_dfs_base_module_t;

View File

@ -13,6 +13,8 @@
#include "orte_config.h"
#include "opal/dss/dss_types.h"
BEGIN_C_DECLS
typedef uint8_t orte_dfs_cmd_t;
@ -23,6 +25,26 @@ typedef uint8_t orte_dfs_cmd_t;
#define ORTE_DFS_SIZE_CMD 3
#define ORTE_DFS_SEEK_CMD 4
#define ORTE_DFS_READ_CMD 5
#define ORTE_DFS_POST_CMD 6
#define ORTE_DFS_GETFM_CMD 7
#define ORTE_DFS_LOAD_CMD 8
#define ORTE_DFS_PURGE_CMD 9
/* file maps */
typedef struct {
opal_list_item_t super;
orte_jobid_t jobid;
opal_list_t maps;
} orte_dfs_jobfm_t;
ORTE_DECLSPEC OBJ_CLASS_DECLARATION(orte_dfs_jobfm_t);
typedef struct {
opal_list_item_t super;
orte_vpid_t vpid;
int num_entries;
opal_byte_object_t fm;
} orte_dfs_vpidfm_t;
ORTE_DECLSPEC OBJ_CLASS_DECLARATION(orte_dfs_vpidfm_t);
typedef void (*orte_dfs_open_callback_fn_t)(int fd, void *cbdata);
@ -36,6 +58,14 @@ typedef void (*orte_dfs_read_callback_fn_t)(long status,
uint8_t *buffer,
void *cbdata);
typedef void (*orte_dfs_post_callback_fn_t)(void *cbdata);
typedef void (*orte_dfs_fm_callback_fn_t)(opal_byte_object_t *bo, void *cbdata);
typedef void (*orte_dfs_load_callback_fn_t)(void *cbdata);
typedef void (*orte_dfs_purge_callback_fn_t)(void *cbdata);
END_C_DECLS
#endif

View File

@ -67,7 +67,19 @@ static void dfs_read(int fd, uint8_t *buffer,
long length,
orte_dfs_read_callback_fn_t cbfunc,
void *cbdata);
static void dfs_post_file_map(opal_byte_object_t *bo,
orte_dfs_post_callback_fn_t cbfunc,
void *cbdata);
static void dfs_get_file_map(orte_process_name_t *target,
orte_dfs_fm_callback_fn_t cbfunc,
void *cbdata);
static void dfs_load_file_maps(orte_jobid_t jobid,
opal_byte_object_t *bo,
orte_dfs_load_callback_fn_t cbfunc,
void *cbdata);
static void dfs_purge_file_maps(orte_jobid_t jobid,
orte_dfs_purge_callback_fn_t cbfunc,
void *cbdata);
/******************
* Daemon/HNP module
******************/
@ -78,10 +90,14 @@ orte_dfs_base_module_t orte_dfs_orted_module = {
dfs_close,
dfs_get_file_size,
dfs_seek,
dfs_read
dfs_read,
dfs_post_file_map,
dfs_get_file_map,
dfs_load_file_maps,
dfs_purge_file_maps
};
static opal_list_t requests, active_files;
static opal_list_t requests, active_files, file_maps;
static int local_fd = 0;
static uint64_t req_id = 0;
static void recv_dfs_cmd(int status, orte_process_name_t* sender,
@ -97,6 +113,7 @@ static int init(void)
OBJ_CONSTRUCT(&requests, opal_list_t);
OBJ_CONSTRUCT(&active_files, opal_list_t);
OBJ_CONSTRUCT(&file_maps, opal_list_t);
if (ORTE_SUCCESS != (rc = orte_rml.recv_buffer_nb(ORTE_NAME_WILDCARD,
ORTE_RML_TAG_DFS_CMD,
ORTE_RML_PERSISTENT,
@ -128,6 +145,10 @@ static int finalize(void)
OBJ_RELEASE(item);
}
OBJ_DESTRUCT(&active_files);
while (NULL != (item = opal_list_remove_first(&file_maps))) {
OBJ_RELEASE(item);
}
OBJ_DESTRUCT(&file_maps);
return ORTE_SUCCESS;
}
@ -794,6 +815,350 @@ static void dfs_read(int fd, uint8_t *buffer,
ORTE_DFS_POST_REQUEST(dfs, process_reads);
}
static void process_posts(int fd, short args, void *cbdata)
{
orte_dfs_request_t *dfs = (orte_dfs_request_t*)cbdata;
orte_dfs_jobfm_t *jptr, *jfm;
orte_dfs_vpidfm_t *vptr, *vfm;
opal_list_item_t *item;
int rc;
opal_buffer_t buf;
opal_output_verbose(1, orte_dfs_base.output,
"%s posting file map for target %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(&dfs->target));
/* lookup the job map */
jfm = NULL;
for (item = opal_list_get_first(&file_maps);
item != opal_list_get_end(&file_maps);
item = opal_list_get_next(item)) {
jptr = (orte_dfs_jobfm_t*)item;
if (jptr->jobid == dfs->target.jobid) {
jfm = jptr;
break;
}
}
if (NULL == jfm) {
/* add it */
jfm = OBJ_NEW(orte_dfs_jobfm_t);
jfm->jobid = dfs->target.jobid;
opal_list_append(&file_maps, &jfm->super);
}
/* see if we already have an entry for this source */
vfm = NULL;
for (item = opal_list_get_first(&jfm->maps);
item != opal_list_get_end(&jfm->maps);
item = opal_list_get_next(item)) {
vptr = (orte_dfs_vpidfm_t*)item;
if (vptr->vpid == dfs->target.vpid) {
vfm = vptr;
break;
}
}
if (NULL == vfm) {
/* add it */
vfm = OBJ_NEW(orte_dfs_vpidfm_t);
vfm->vpid = dfs->target.vpid;
opal_list_append(&jfm->maps, &vfm->super);
}
/* add this entry to any prior one */
OBJ_CONSTRUCT(&buf, opal_buffer_t);
opal_dss.load(&buf, vfm->fm.bytes, vfm->fm.size);
if (OPAL_SUCCESS != (rc = opal_dss.pack(&buf, &dfs->bo, 1, OPAL_BYTE_OBJECT))) {
ORTE_ERROR_LOG(rc);
goto complete;
}
vfm->num_entries++;
opal_dss.unload(&buf, (void**)&vfm->fm.bytes, &vfm->fm.size);
opal_output_verbose(1, orte_dfs_base.output,
"%s target %s now has %d entries",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(&dfs->target),
vfm->num_entries);
complete:
if (NULL != dfs->post_cbfunc) {
dfs->post_cbfunc(dfs->cbdata);
}
OBJ_RELEASE(dfs);
}
static void dfs_post_file_map(opal_byte_object_t *bo,
orte_dfs_post_callback_fn_t cbfunc,
void *cbdata)
{
orte_dfs_request_t *dfs;
dfs = OBJ_NEW(orte_dfs_request_t);
dfs->cmd = ORTE_DFS_POST_CMD;
dfs->target.jobid = ORTE_PROC_MY_NAME->jobid;
dfs->target.vpid = ORTE_PROC_MY_NAME->vpid;
dfs->bo = bo;
dfs->post_cbfunc = cbfunc;
dfs->cbdata = cbdata;
/* post it for processing */
ORTE_DFS_POST_REQUEST(dfs, process_posts);
}
static void get_job_maps(orte_dfs_jobfm_t *jfm,
orte_vpid_t vpid,
opal_buffer_t *buf)
{
orte_dfs_vpidfm_t *vfm;
opal_list_item_t *item;
opal_byte_object_t *boptr;
int rc;
/* if the target vpid is WILDCARD, then process
* data for all vpids - else, find the one
*/
for (item = opal_list_get_first(&jfm->maps);
item != opal_list_get_end(&jfm->maps);
item = opal_list_get_next(item)) {
vfm = (orte_dfs_vpidfm_t*)item;
if (ORTE_VPID_WILDCARD == vpid ||
vfm->vpid == vpid) {
/* indicate data from this vpid */
if (OPAL_SUCCESS != (rc = opal_dss.pack(buf, &vfm->vpid, 1, ORTE_VPID))) {
ORTE_ERROR_LOG(rc);
return;
}
/* pack the number of entries in its byte object */
if (OPAL_SUCCESS != (rc = opal_dss.pack(buf, &vfm->num_entries, 1, OPAL_INT32))) {
ORTE_ERROR_LOG(rc);
return;
}
/* pack the byte object */
boptr = &vfm->fm;
if (OPAL_SUCCESS != (rc = opal_dss.pack(buf, &boptr, 1, OPAL_BYTE_OBJECT))) {
ORTE_ERROR_LOG(rc);
return;
}
}
}
}
static void process_getfm(int fd, short args, void *cbdata)
{
orte_dfs_request_t *dfs = (orte_dfs_request_t*)cbdata;
orte_dfs_jobfm_t *jfm;
opal_buffer_t buf;
opal_list_item_t *item;
opal_byte_object_t bo;
/* prep the collection bucket */
OBJ_CONSTRUCT(&buf, opal_buffer_t);
/* if the target job is WILDCARD, then process
* data for all jobids - else, find the one
*/
for (item = opal_list_get_first(&file_maps);
item != opal_list_get_end(&file_maps);
item = opal_list_get_next(item)) {
jfm = (orte_dfs_jobfm_t*)item;
if (ORTE_JOBID_WILDCARD == dfs->target.jobid ||
jfm->jobid == dfs->target.jobid) {
get_job_maps(jfm, dfs->target.vpid, &buf);
}
}
/* unload the result */
opal_dss.unload(&buf, (void**)&bo.bytes, &bo.size);
OBJ_DESTRUCT(&buf);
if (NULL != dfs->fm_cbfunc) {
dfs->fm_cbfunc(&bo, dfs->cbdata);
}
if (NULL != bo.bytes) {
free(bo.bytes);
}
OBJ_RELEASE(dfs);
}
static void dfs_get_file_map(orte_process_name_t *target,
orte_dfs_fm_callback_fn_t cbfunc,
void *cbdata)
{
orte_dfs_request_t *dfs;
opal_output_verbose(1, orte_dfs_base.output,
"%s get file map for %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(target));
dfs = OBJ_NEW(orte_dfs_request_t);
dfs->cmd = ORTE_DFS_GETFM_CMD;
dfs->target.jobid = target->jobid;
dfs->target.vpid = target->vpid;
dfs->fm_cbfunc = cbfunc;
dfs->cbdata = cbdata;
/* post it for processing */
ORTE_DFS_POST_REQUEST(dfs, process_getfm);
}
static void process_load(int fd, short args, void *cbdata)
{
orte_dfs_request_t *dfs = (orte_dfs_request_t*)cbdata;
opal_list_item_t *item;
orte_dfs_jobfm_t *jfm, *jptr;
orte_dfs_vpidfm_t *vfm;
orte_vpid_t vpid;
int32_t entries;
opal_byte_object_t *boptr;
opal_buffer_t buf;
int cnt;
int rc;
/* see if we already have a tracker for this job */
jfm = NULL;
for (item = opal_list_get_first(&file_maps);
item != opal_list_get_end(&file_maps);
item = opal_list_get_next(item)) {
jptr = (orte_dfs_jobfm_t*)item;
if (jptr->jobid == dfs->target.jobid) {
jfm = jptr;
break;
}
}
if (NULL != jfm) {
/* need to purge it first */
while (NULL != (item = opal_list_remove_first(&jfm->maps))) {
OBJ_RELEASE(item);
}
} else {
jfm = OBJ_NEW(orte_dfs_jobfm_t);
jfm->jobid = dfs->target.jobid;
opal_list_append(&file_maps, &jfm->super);
}
/* setup to unpack the byte object */
OBJ_CONSTRUCT(&buf, opal_buffer_t);
opal_dss.load(&buf, dfs->bo->bytes, dfs->bo->size);
/* unpack the buffer */
cnt = 1;
while (OPAL_SUCCESS == opal_dss.unpack(&buf, &vpid, &cnt, ORTE_VPID)) {
/* unpack the number of entries contained in the byte object */
cnt = 1;
if (OPAL_SUCCESS != (rc = opal_dss.unpack(&buf, &entries, &cnt, OPAL_INT32))) {
ORTE_ERROR_LOG(rc);
goto complete;
}
/* unpack the byte object */
cnt = 1;
if (OPAL_SUCCESS != (rc = opal_dss.unpack(&buf, &boptr, &cnt, OPAL_BYTE_OBJECT))) {
ORTE_ERROR_LOG(rc);
goto complete;
}
/* create the entry */
vfm = OBJ_NEW(orte_dfs_vpidfm_t);
vfm->vpid = vpid;
vfm->num_entries = entries;
vfm->fm.bytes = boptr->bytes;
boptr->bytes = NULL;
vfm->fm.size = boptr->size;
opal_list_append(&jfm->maps, &vfm->super);
/* cleanup */
free(boptr);
}
complete:
/* must reload the byte object as it belongs to
* our original caller - the load function will
* have cleared it
*/
opal_dss.unload(&buf, (void**)&dfs->bo->bytes, &dfs->bo->size);
OBJ_DESTRUCT(&buf);
if (NULL != dfs->load_cbfunc) {
dfs->load_cbfunc(dfs->cbdata);
}
OBJ_RELEASE(dfs);
}
static void dfs_load_file_maps(orte_jobid_t jobid,
opal_byte_object_t *bo,
orte_dfs_load_callback_fn_t cbfunc,
void *cbdata)
{
orte_dfs_request_t *dfs;
opal_output_verbose(1, orte_dfs_base.output,
"%s loading file maps for %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_JOBID_PRINT(jobid));
dfs = OBJ_NEW(orte_dfs_request_t);
dfs->cmd = ORTE_DFS_LOAD_CMD;
dfs->target.jobid = jobid;
dfs->bo = bo;
dfs->load_cbfunc = cbfunc;
dfs->cbdata = cbdata;
/* post it for processing */
ORTE_DFS_POST_REQUEST(dfs, process_load);
}
static void process_purge(int fd, short args, void *cbdata)
{
orte_dfs_request_t *dfs = (orte_dfs_request_t*)cbdata;
opal_list_item_t *item;
orte_dfs_jobfm_t *jfm, *jptr;
/* find the job tracker */
jfm = NULL;
for (item = opal_list_get_first(&file_maps);
item != opal_list_get_end(&file_maps);
item = opal_list_get_next(item)) {
jptr = (orte_dfs_jobfm_t*)item;
if (jptr->jobid == dfs->target.jobid) {
jfm = jptr;
break;
}
}
if (NULL == jptr) {
ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND);
} else {
/* remove it from the list */
opal_list_remove_item(&file_maps, &jptr->super);
/* the destructor will release the list of maps
* in the jobfm object
*/
OBJ_RELEASE(jptr);
}
if (NULL != dfs->purge_cbfunc) {
dfs->purge_cbfunc(dfs->cbdata);
}
OBJ_RELEASE(dfs);
}
static void dfs_purge_file_maps(orte_jobid_t jobid,
orte_dfs_purge_callback_fn_t cbfunc,
void *cbdata)
{
orte_dfs_request_t *dfs;
opal_output_verbose(1, orte_dfs_base.output,
"%s purging file maps for job %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_JOBID_PRINT(jobid));
dfs = OBJ_NEW(orte_dfs_request_t);
dfs->cmd = ORTE_DFS_PURGE_CMD;
dfs->target.jobid = jobid;
dfs->purge_cbfunc = cbfunc;
dfs->cbdata = cbdata;
/* post it for processing */
ORTE_DFS_POST_REQUEST(dfs, process_purge);
}
/* receives take place in an event, so we are free to process
* the request list without fear of getting things out-of-order
@ -813,6 +1178,11 @@ static void recv_dfs_cmd(int status, orte_process_name_t* sender,
uint64_t rid;
int whence;
struct stat buf;
orte_process_name_t source;
opal_byte_object_t *bo;
orte_dfs_request_t *dfs;
orte_dfs_jobfm_t *jfm;
opal_buffer_t *answer, bucket;
/* unpack the command */
cnt = 1;
@ -860,25 +1230,25 @@ static void recv_dfs_cmd(int status, orte_process_name_t* sender,
opal_list_append(&active_files, &trk->super);
answer_open:
/* construct the return message */
buffer = OBJ_NEW(opal_buffer_t);
if (OPAL_SUCCESS != (rc = opal_dss.pack(buffer, &cmd, 1, ORTE_DFS_CMD_T))) {
answer = OBJ_NEW(opal_buffer_t);
if (OPAL_SUCCESS != (rc = opal_dss.pack(answer, &cmd, 1, ORTE_DFS_CMD_T))) {
ORTE_ERROR_LOG(rc);
return;
}
if (OPAL_SUCCESS != (rc = opal_dss.pack(buffer, &rid, 1, OPAL_UINT64))) {
if (OPAL_SUCCESS != (rc = opal_dss.pack(answer, &rid, 1, OPAL_UINT64))) {
ORTE_ERROR_LOG(rc);
return;
}
if (OPAL_SUCCESS != (rc = opal_dss.pack(buffer, &my_fd, 1, OPAL_INT))) {
if (OPAL_SUCCESS != (rc = opal_dss.pack(answer, &my_fd, 1, OPAL_INT))) {
ORTE_ERROR_LOG(rc);
return;
}
/* send it */
if (0 > (rc = orte_rml.send_buffer_nb(sender, buffer,
if (0 > (rc = orte_rml.send_buffer_nb(sender, answer,
ORTE_RML_TAG_DFS_DATA, 0,
orte_rml_send_callback, NULL))) {
ORTE_ERROR_LOG(rc);
OBJ_RELEASE(buffer);
OBJ_RELEASE(answer);
return;
}
break;
@ -940,25 +1310,25 @@ static void recv_dfs_cmd(int status, orte_process_name_t* sender,
}
}
/* construct the return message */
buffer = OBJ_NEW(opal_buffer_t);
if (OPAL_SUCCESS != (rc = opal_dss.pack(buffer, &cmd, 1, ORTE_DFS_CMD_T))) {
answer = OBJ_NEW(opal_buffer_t);
if (OPAL_SUCCESS != (rc = opal_dss.pack(answer, &cmd, 1, ORTE_DFS_CMD_T))) {
ORTE_ERROR_LOG(rc);
return;
}
if (OPAL_SUCCESS != (rc = opal_dss.pack(buffer, &rid, 1, OPAL_UINT64))) {
if (OPAL_SUCCESS != (rc = opal_dss.pack(answer, &rid, 1, OPAL_UINT64))) {
ORTE_ERROR_LOG(rc);
return;
}
if (OPAL_SUCCESS != (rc = opal_dss.pack(buffer, &i64, 1, OPAL_INT64))) {
if (OPAL_SUCCESS != (rc = opal_dss.pack(answer, &i64, 1, OPAL_INT64))) {
ORTE_ERROR_LOG(rc);
return;
}
/* send it */
if (0 > (rc = orte_rml.send_buffer_nb(sender, buffer,
if (0 > (rc = orte_rml.send_buffer_nb(sender, answer,
ORTE_RML_TAG_DFS_DATA, 0,
orte_rml_send_callback, NULL))) {
ORTE_ERROR_LOG(rc);
OBJ_RELEASE(buffer);
OBJ_RELEASE(answer);
return;
}
break;
@ -1033,17 +1403,17 @@ static void recv_dfs_cmd(int status, orte_process_name_t* sender,
}
}
/* construct the return message */
buffer = OBJ_NEW(opal_buffer_t);
if (OPAL_SUCCESS != (rc = opal_dss.pack(buffer, &cmd, 1, ORTE_DFS_CMD_T))) {
answer = OBJ_NEW(opal_buffer_t);
if (OPAL_SUCCESS != (rc = opal_dss.pack(answer, &cmd, 1, ORTE_DFS_CMD_T))) {
ORTE_ERROR_LOG(rc);
return;
}
if (OPAL_SUCCESS != (rc = opal_dss.pack(buffer, &rid, 1, OPAL_UINT64))) {
if (OPAL_SUCCESS != (rc = opal_dss.pack(answer, &rid, 1, OPAL_UINT64))) {
ORTE_ERROR_LOG(rc);
return;
}
/* return the offset/status */
if (OPAL_SUCCESS != (rc = opal_dss.pack(buffer, &bytes_read, 1, OPAL_INT64))) {
if (OPAL_SUCCESS != (rc = opal_dss.pack(answer, &bytes_read, 1, OPAL_INT64))) {
ORTE_ERROR_LOG(rc);
return;
}
@ -1053,11 +1423,11 @@ static void recv_dfs_cmd(int status, orte_process_name_t* sender,
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
(long)bytes_read,
ORTE_NAME_PRINT(sender));
if (0 > (rc = orte_rml.send_buffer_nb(sender, buffer,
if (0 > (rc = orte_rml.send_buffer_nb(sender, answer,
ORTE_RML_TAG_DFS_DATA, 0,
orte_rml_send_callback, NULL))) {
ORTE_ERROR_LOG(rc);
OBJ_RELEASE(buffer);
OBJ_RELEASE(answer);
return;
}
break;
@ -1113,23 +1483,23 @@ static void recv_dfs_cmd(int status, orte_process_name_t* sender,
}
answer_read:
/* construct the return message */
buffer = OBJ_NEW(opal_buffer_t);
if (OPAL_SUCCESS != (rc = opal_dss.pack(buffer, &cmd, 1, ORTE_DFS_CMD_T))) {
answer = OBJ_NEW(opal_buffer_t);
if (OPAL_SUCCESS != (rc = opal_dss.pack(answer, &cmd, 1, ORTE_DFS_CMD_T))) {
ORTE_ERROR_LOG(rc);
return;
}
if (OPAL_SUCCESS != (rc = opal_dss.pack(buffer, &rid, 1, OPAL_UINT64))) {
if (OPAL_SUCCESS != (rc = opal_dss.pack(answer, &rid, 1, OPAL_UINT64))) {
ORTE_ERROR_LOG(rc);
return;
}
/* include the number of bytes read */
if (OPAL_SUCCESS != (rc = opal_dss.pack(buffer, &bytes_read, 1, OPAL_INT64))) {
if (OPAL_SUCCESS != (rc = opal_dss.pack(answer, &bytes_read, 1, OPAL_INT64))) {
ORTE_ERROR_LOG(rc);
return;
}
/* include the bytes read */
if (0 < bytes_read) {
if (OPAL_SUCCESS != (rc = opal_dss.pack(buffer, read_buf, bytes_read, OPAL_UINT8))) {
if (OPAL_SUCCESS != (rc = opal_dss.pack(answer, read_buf, bytes_read, OPAL_UINT8))) {
ORTE_ERROR_LOG(rc);
return;
}
@ -1140,11 +1510,11 @@ static void recv_dfs_cmd(int status, orte_process_name_t* sender,
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
(long)bytes_read,
ORTE_NAME_PRINT(sender));
if (0 > (rc = orte_rml.send_buffer_nb(sender, buffer,
if (0 > (rc = orte_rml.send_buffer_nb(sender, answer,
ORTE_RML_TAG_DFS_DATA, 0,
orte_rml_send_callback, NULL))) {
ORTE_ERROR_LOG(rc);
OBJ_RELEASE(buffer);
OBJ_RELEASE(answer);
return;
}
if (NULL != read_buf) {
@ -1152,6 +1522,117 @@ static void recv_dfs_cmd(int status, orte_process_name_t* sender,
}
break;
case ORTE_DFS_POST_CMD:
/* unpack their request id */
cnt = 1;
if (OPAL_SUCCESS != (rc = opal_dss.unpack(buffer, &rid, &cnt, OPAL_UINT64))) {
ORTE_ERROR_LOG(rc);
return;
}
/* unpack the name of the source of this data */
cnt = 1;
if (OPAL_SUCCESS != (rc = opal_dss.unpack(buffer, &source, &cnt, ORTE_NAME))) {
ORTE_ERROR_LOG(rc);
return;
}
/* unpack their byte object */
cnt = 1;
if (OPAL_SUCCESS != (rc = opal_dss.unpack(buffer, &bo, &cnt, OPAL_BYTE_OBJECT))) {
ORTE_ERROR_LOG(rc);
goto answer_read;
}
/* add the contents to the storage for this process */
dfs = OBJ_NEW(orte_dfs_request_t);
dfs->target.jobid = source.jobid;
dfs->target.vpid = source.vpid;
dfs->bo = bo;
dfs->post_cbfunc = NULL;
process_posts(0, 0, (void*)dfs);
/* return an ack */
answer = OBJ_NEW(opal_buffer_t);
if (OPAL_SUCCESS != (rc = opal_dss.pack(answer, &cmd, 1, ORTE_DFS_CMD_T))) {
ORTE_ERROR_LOG(rc);
return;
}
if (OPAL_SUCCESS != (rc = opal_dss.pack(answer, &rid, 1, OPAL_UINT64))) {
ORTE_ERROR_LOG(rc);
return;
}
if (0 > (rc = orte_rml.send_buffer_nb(sender, answer,
ORTE_RML_TAG_DFS_DATA, 0,
orte_rml_send_callback, NULL))) {
ORTE_ERROR_LOG(rc);
OBJ_RELEASE(answer);
}
if (NULL != bo->bytes) {
free(bo->bytes);
}
free(bo);
break;
case ORTE_DFS_GETFM_CMD:
/* unpack their request id */
cnt = 1;
if (OPAL_SUCCESS != (rc = opal_dss.unpack(buffer, &rid, &cnt, OPAL_UINT64))) {
ORTE_ERROR_LOG(rc);
return;
}
/* unpack the target */
cnt = 1;
if (OPAL_SUCCESS != (rc = opal_dss.unpack(buffer, &source, &cnt, ORTE_NAME))) {
ORTE_ERROR_LOG(rc);
goto answer_read;
}
/* search our data tree for matches, assembling them
* into a byte object
*/
/* if the target job is WILDCARD, then process
* data for all jobids - else, find the one
*/
OBJ_CONSTRUCT(&bucket, opal_buffer_t);
for (item = opal_list_get_first(&file_maps);
item != opal_list_get_end(&file_maps);
item = opal_list_get_next(item)) {
jfm = (orte_dfs_jobfm_t*)item;
if (ORTE_JOBID_WILDCARD == source.jobid ||
jfm->jobid == source.jobid) {
get_job_maps(jfm, source.vpid, &bucket);
}
}
/* unload the result */
bo = (opal_byte_object_t*)malloc(sizeof(opal_byte_object_t));
opal_dss.unload(&bucket, (void**)&bo->bytes, &bo->size);
OBJ_DESTRUCT(&bucket);
opal_output_verbose(1, orte_dfs_base.output,
"%s getf-cmd: returning %d bytes to sender %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), bo->size,
ORTE_NAME_PRINT(sender));
/* construct the response */
answer = OBJ_NEW(opal_buffer_t);
if (OPAL_SUCCESS != (rc = opal_dss.pack(answer, &cmd, 1, ORTE_DFS_CMD_T))) {
ORTE_ERROR_LOG(rc);
return;
}
if (OPAL_SUCCESS != (rc = opal_dss.pack(answer, &rid, 1, OPAL_UINT64))) {
ORTE_ERROR_LOG(rc);
return;
}
if (OPAL_SUCCESS != (rc = opal_dss.pack(answer, &bo, 1, OPAL_BYTE_OBJECT))) {
ORTE_ERROR_LOG(rc);
return;
}
if (NULL != bo->bytes) {
free(bo->bytes);
}
free(bo);
if (0 > (rc = orte_rml.send_buffer_nb(sender, answer,
ORTE_RML_TAG_DFS_DATA, 0,
orte_rml_send_callback, NULL))) {
ORTE_ERROR_LOG(rc);
OBJ_RELEASE(answer);
}
break;
default:
opal_output(0, "ORTED:DFS:RECV_DFS WTF");
break;