diff --git a/orte/mca/dfs/app/dfs_app.c b/orte/mca/dfs/app/dfs_app.c index bdeb70c4a1..1feab2ee6a 100644 --- a/orte/mca/dfs/app/dfs_app.c +++ b/orte/mca/dfs/app/dfs_app.c @@ -60,6 +60,19 @@ static void dfs_read(int fd, uint8_t *buffer, long length, orte_dfs_read_callback_fn_t cbfunc, void *cbdata); +static void dfs_post_file_map(opal_byte_object_t *bo, + orte_dfs_post_callback_fn_t cbfunc, + void *cbdata); +static void dfs_get_file_map(orte_process_name_t *target, + orte_dfs_fm_callback_fn_t cbfunc, + void *cbdata); +static void dfs_load_file_maps(orte_jobid_t jobid, + opal_byte_object_t *bo, + orte_dfs_load_callback_fn_t cbfunc, + void *cbdata); +static void dfs_purge_file_maps(orte_jobid_t jobid, + orte_dfs_purge_callback_fn_t cbfunc, + void *cbdata); /****************** * APP module @@ -71,7 +84,11 @@ orte_dfs_base_module_t orte_dfs_app_module = { dfs_close, dfs_get_file_size, dfs_seek, - dfs_read + dfs_read, + dfs_post_file_map, + dfs_get_file_map, + dfs_load_file_maps, + dfs_purge_file_maps }; static opal_list_t requests, active_files; @@ -128,6 +145,7 @@ static void recv_dfs(int status, orte_process_name_t* sender, int64_t i64; uint64_t rid; orte_dfs_tracker_t *trk; + opal_byte_object_t *bo; /* unpack the command this message is responding to */ cnt = 1; @@ -352,6 +370,79 @@ static void recv_dfs(int status, orte_process_name_t* sender, OBJ_RELEASE(dfs); break; + case ORTE_DFS_POST_CMD: + /* unpack the request id for this read */ + cnt = 1; + if (OPAL_SUCCESS != (rc = opal_dss.unpack(buffer, &rid, &cnt, OPAL_UINT64))) { + ORTE_ERROR_LOG(rc); + return; + } + /* search our list of requests to find the matching one */ + dfs = NULL; + for (item = opal_list_get_first(&requests); + item != opal_list_get_end(&requests); + item = opal_list_get_next(item)) { + dptr = (orte_dfs_request_t*)item; + if (dptr->id == rid) { + /* request was fulfilled, so remove it */ + opal_list_remove_item(&requests, item); + dfs = dptr; + break; + } + } + if (NULL == dfs) { + opal_output_verbose(1, orte_dfs_base.output, + "%s recvd post - no corresponding request found", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)); + ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND); + return; + } + if (NULL != dfs->post_cbfunc) { + dfs->post_cbfunc(dfs->cbdata); + } + OBJ_RELEASE(dfs); + break; + + case ORTE_DFS_GETFM_CMD: + /* unpack the request id for this read */ + cnt = 1; + if (OPAL_SUCCESS != (rc = opal_dss.unpack(buffer, &rid, &cnt, OPAL_UINT64))) { + ORTE_ERROR_LOG(rc); + return; + } + /* search our list of requests to find the matching one */ + dfs = NULL; + for (item = opal_list_get_first(&requests); + item != opal_list_get_end(&requests); + item = opal_list_get_next(item)) { + dptr = (orte_dfs_request_t*)item; + if (dptr->id == rid) { + /* request was fulfilled, so remove it */ + opal_list_remove_item(&requests, item); + dfs = dptr; + break; + } + } + if (NULL == dfs) { + opal_output_verbose(1, orte_dfs_base.output, + "%s recvd getfm - no corresponding request found", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)); + ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND); + return; + } + /* unpack the byte object */ + cnt = 1; + if (OPAL_SUCCESS != (rc = opal_dss.unpack(buffer, &bo, &cnt, OPAL_BYTE_OBJECT))) { + ORTE_ERROR_LOG(rc); + return; + } + /* return it to caller */ + if (NULL != dfs->fm_cbfunc) { + dfs->fm_cbfunc(bo, dfs->cbdata); + } + OBJ_RELEASE(dfs); + break; + default: opal_output(0, "APP:DFS:RECV WTF"); break; @@ -382,7 +473,6 @@ static void open_local_file(orte_dfs_request_t *dfs) if (NULL != dfs->open_cbfunc) { dfs->open_cbfunc(dfs->remote_fd, dfs->cbdata); } - OBJ_RELEASE(dfs); return; } /* otherwise, create a tracker for this file */ @@ -1046,3 +1136,152 @@ static void dfs_read(int fd, uint8_t *buffer, /* post it for processing */ ORTE_DFS_POST_REQUEST(dfs, process_reads); } + +static void process_posts(int fd, short args, void *cbdata) +{ + orte_dfs_request_t *dfs = (orte_dfs_request_t*)cbdata; + opal_buffer_t *buffer; + int rc; + + /* we will get confirmation in our receive function, so + * add this request to our list */ + dfs->id = req_id++; + opal_list_append(&requests, &dfs->super); + + /* Send the byte object to our local daemon for storage */ + buffer = OBJ_NEW(opal_buffer_t); + if (OPAL_SUCCESS != (rc = opal_dss.pack(buffer, &dfs->cmd, 1, ORTE_DFS_CMD_T))) { + ORTE_ERROR_LOG(rc); + goto error; + } + /* include the request id */ + if (OPAL_SUCCESS != (rc = opal_dss.pack(buffer, &dfs->id, 1, OPAL_UINT64))) { + ORTE_ERROR_LOG(rc); + goto error; + } + /* add my name */ + if (OPAL_SUCCESS != (rc = opal_dss.pack(buffer, ORTE_PROC_MY_NAME, 1, ORTE_NAME))) { + ORTE_ERROR_LOG(rc); + goto error; + } + if (OPAL_SUCCESS != (rc = opal_dss.pack(buffer, &dfs->bo, 1, OPAL_BYTE_OBJECT))) { + ORTE_ERROR_LOG(rc); + goto error; + } + /* send it */ + if (0 > (rc = orte_rml.send_buffer_nb(ORTE_PROC_MY_DAEMON, buffer, + ORTE_RML_TAG_DFS_CMD, 0, + orte_rml_send_callback, NULL))) { + ORTE_ERROR_LOG(rc); + goto error; + } + return; + + error: + OBJ_RELEASE(buffer); + opal_list_remove_item(&requests, &dfs->super); + if (NULL != dfs->post_cbfunc) { + dfs->post_cbfunc(dfs->cbdata); + } + OBJ_RELEASE(dfs); +} + +static void dfs_post_file_map(opal_byte_object_t *bo, + orte_dfs_post_callback_fn_t cbfunc, + void *cbdata) +{ + orte_dfs_request_t *dfs; + + dfs = OBJ_NEW(orte_dfs_request_t); + dfs->cmd = ORTE_DFS_POST_CMD; + dfs->bo = bo; + dfs->post_cbfunc = cbfunc; + dfs->cbdata = cbdata; + + /* post it for processing */ + ORTE_DFS_POST_REQUEST(dfs, process_posts); +} + +static void process_getfm(int fd, short args, void *cbdata) +{ + orte_dfs_request_t *dfs = (orte_dfs_request_t*)cbdata; + opal_buffer_t *buffer; + int rc; + + /* we will get confirmation in our receive function, so + * add this request to our list */ + dfs->id = req_id++; + opal_list_append(&requests, &dfs->super); + + /* Send the request to our local daemon */ + buffer = OBJ_NEW(opal_buffer_t); + if (OPAL_SUCCESS != (rc = opal_dss.pack(buffer, &dfs->cmd, 1, ORTE_DFS_CMD_T))) { + ORTE_ERROR_LOG(rc); + goto error; + } + /* include the request id */ + if (OPAL_SUCCESS != (rc = opal_dss.pack(buffer, &dfs->id, 1, OPAL_UINT64))) { + ORTE_ERROR_LOG(rc); + goto error; + } + /* and the target */ + if (OPAL_SUCCESS != (rc = opal_dss.pack(buffer, &dfs->target, 1, ORTE_NAME))) { + ORTE_ERROR_LOG(rc); + goto error; + } + /* send it */ + if (0 > (rc = orte_rml.send_buffer_nb(ORTE_PROC_MY_DAEMON, buffer, + ORTE_RML_TAG_DFS_CMD, 0, + orte_rml_send_callback, NULL))) { + ORTE_ERROR_LOG(rc); + goto error; + } + return; + + error: + OBJ_RELEASE(buffer); + opal_list_remove_item(&requests, &dfs->super); + if (NULL != dfs->fm_cbfunc) { + dfs->fm_cbfunc(NULL, dfs->cbdata); + } + OBJ_RELEASE(dfs); +} + +static void dfs_get_file_map(orte_process_name_t *target, + orte_dfs_fm_callback_fn_t cbfunc, + void *cbdata) +{ + orte_dfs_request_t *dfs; + + dfs = OBJ_NEW(orte_dfs_request_t); + dfs->cmd = ORTE_DFS_GETFM_CMD; + dfs->target.jobid = target->jobid; + dfs->target.vpid = target->vpid; + dfs->fm_cbfunc = cbfunc; + dfs->cbdata = cbdata; + + /* post it for processing */ + ORTE_DFS_POST_REQUEST(dfs, process_getfm); +} + +static void dfs_load_file_maps(orte_jobid_t jobid, + opal_byte_object_t *bo, + orte_dfs_load_callback_fn_t cbfunc, + void *cbdata) +{ + /* apps don't store file maps */ + if (NULL != cbfunc) { + cbfunc(cbdata); + } +} + +static void dfs_purge_file_maps(orte_jobid_t jobid, + orte_dfs_purge_callback_fn_t cbfunc, + void *cbdata) +{ + /* apps don't store file maps */ + if (NULL != cbfunc) { + cbfunc(cbdata); + } +} + diff --git a/orte/mca/dfs/base/base.h b/orte/mca/dfs/base/base.h index 71b7ec2d37..b93b67a102 100644 --- a/orte/mca/dfs/base/base.h +++ b/orte/mca/dfs/base/base.h @@ -62,16 +62,22 @@ typedef struct { opal_event_t ev; uint64_t id; orte_dfs_cmd_t cmd; + orte_process_name_t target; char *uri; int local_fd; int remote_fd; uint8_t *read_buffer; long read_length; + opal_byte_object_t *bo; orte_dfs_open_callback_fn_t open_cbfunc; orte_dfs_close_callback_fn_t close_cbfunc; orte_dfs_size_callback_fn_t size_cbfunc; orte_dfs_seek_callback_fn_t seek_cbfunc; orte_dfs_read_callback_fn_t read_cbfunc; + orte_dfs_post_callback_fn_t post_cbfunc; + orte_dfs_fm_callback_fn_t fm_cbfunc; + orte_dfs_load_callback_fn_t load_cbfunc; + orte_dfs_purge_callback_fn_t purge_cbfunc; void *cbdata; } orte_dfs_request_t; OBJ_CLASS_DECLARATION(orte_dfs_request_t); @@ -80,7 +86,7 @@ OBJ_CLASS_DECLARATION(orte_dfs_request_t); do { \ opal_event_set(orte_event_base, &((d)->ev), \ -1, OPAL_EV_WRITE, (cb), (d)); \ - opal_event_set_priority(&((d)->ev), ORTE_MSG_PRI); \ + opal_event_set_priority(&((d)->ev), ORTE_SYS_PRI); \ opal_event_active(&((d)->ev), OPAL_EV_WRITE, 1); \ } while(0); diff --git a/orte/mca/dfs/base/dfs_base_close.c b/orte/mca/dfs/base/dfs_base_close.c index 4b9eaa46dd..8bb1036980 100644 --- a/orte/mca/dfs/base/dfs_base_close.c +++ b/orte/mca/dfs/base/dfs_base_close.c @@ -22,12 +22,12 @@ int orte_dfs_base_close(void) { /* if not initialized, then skip this action. */ - if( !orte_dfs_base.initialized ) { + if (!orte_dfs_base.initialized) { return ORTE_SUCCESS; } /* Close selected component */ - if( NULL != orte_dfs.finalize ) { + if (NULL != orte_dfs.finalize) { orte_dfs.finalize(); } diff --git a/orte/mca/dfs/base/dfs_base_open.c b/orte/mca/dfs/base/dfs_base_open.c index 314c27a0c9..fca2e952e8 100644 --- a/orte/mca/dfs/base/dfs_base_open.c +++ b/orte/mca/dfs/base/dfs_base_open.c @@ -42,7 +42,19 @@ opal_list_t orte_dfs_base_components_available; orte_dfs_base_t orte_dfs_base; -orte_dfs_base_module_t orte_dfs; +orte_dfs_base_module_t orte_dfs = { + NULL, + NULL, + NULL, + NULL, + NULL, + NULL, + NULL, + NULL, + NULL, + NULL, + NULL +}; /** * Function for finding and opening either all MCA components, or the one @@ -105,6 +117,10 @@ static void req_const(orte_dfs_request_t *dfs) dfs->size_cbfunc = NULL; dfs->seek_cbfunc = NULL; dfs->read_cbfunc = NULL; + dfs->post_cbfunc = NULL; + dfs->fm_cbfunc = NULL; + dfs->load_cbfunc = NULL; + dfs->purge_cbfunc = NULL; dfs->cbdata = NULL; } static void req_dest(orte_dfs_request_t *dfs) @@ -116,3 +132,36 @@ static void req_dest(orte_dfs_request_t *dfs) OBJ_CLASS_INSTANCE(orte_dfs_request_t, opal_list_item_t, req_const, req_dest); + +static void jobfm_const(orte_dfs_jobfm_t *fm) +{ + OBJ_CONSTRUCT(&fm->maps, opal_list_t); +} +static void jobfm_dest(orte_dfs_jobfm_t *fm) +{ + opal_list_item_t *item; + + while (NULL != (item = opal_list_remove_first(&fm->maps))) { + OBJ_RELEASE(item); + } + OBJ_DESTRUCT(&fm->maps); +} +OBJ_CLASS_INSTANCE(orte_dfs_jobfm_t, + opal_list_item_t, + jobfm_const, jobfm_dest); + +static void vpidfm_const(orte_dfs_vpidfm_t *fm) +{ + fm->fm.bytes = NULL; + fm->fm.size = 0; + fm->num_entries = 0; +} +static void vpidfm_dest(orte_dfs_vpidfm_t *fm) +{ + if (NULL != fm->fm.bytes) { + free(fm->fm.bytes); + } +} +OBJ_CLASS_INSTANCE(orte_dfs_vpidfm_t, + opal_list_item_t, + vpidfm_const, vpidfm_dest); diff --git a/orte/mca/dfs/base/dfs_base_select.c b/orte/mca/dfs/base/dfs_base_select.c index f0b43f6fb2..73ca819c72 100644 --- a/orte/mca/dfs/base/dfs_base_select.c +++ b/orte/mca/dfs/base/dfs_base_select.c @@ -24,29 +24,30 @@ int orte_dfs_base_select(void) { - int exit_status = OPAL_SUCCESS; + int exit_status = ORTE_SUCCESS; orte_dfs_base_component_t *best_component = NULL; orte_dfs_base_module_t *best_module = NULL; /* * Select the best component */ - if( OPAL_SUCCESS != mca_base_select("dfs", orte_dfs_base.output, + if (OPAL_SUCCESS != mca_base_select("dfs", orte_dfs_base.output, &orte_dfs_base.components_available, (mca_base_module_t **) &best_module, - (mca_base_component_t **) &best_component) ) { - /* This will only happen if no component was selected */ - exit_status = ORTE_ERROR; - goto cleanup; + (mca_base_component_t **) &best_component)) { + /* This will only happen if no component was selected, which + * is okay - we don't have to select anything + */ + return ORTE_SUCCESS; } /* Save the winner */ orte_dfs = *best_module; /* Initialize the winner */ - if (NULL != best_module) { - if (OPAL_SUCCESS != orte_dfs.init()) { - exit_status = OPAL_ERROR; + if (NULL != best_module && NULL != orte_dfs.init) { + if (ORTE_SUCCESS != orte_dfs.init()) { + exit_status = ORTE_ERROR; goto cleanup; } } diff --git a/orte/mca/dfs/dfs.h b/orte/mca/dfs/dfs.h index 08f0effb03..11660502d6 100644 --- a/orte/mca/dfs/dfs.h +++ b/orte/mca/dfs/dfs.h @@ -12,6 +12,7 @@ #define ORTE_MCA_DFS_H #include "orte_config.h" +#include "orte/types.h" #ifdef HAVE_FCNTL_H #include @@ -106,20 +107,54 @@ typedef void (*orte_dfs_base_module_read_fn_t)(int fd, uint8_t *buffer, void *cbdata); +/* Post a file map so others may access it */ +typedef void (*orte_dfs_base_module_post_file_map_fn_t)(opal_byte_object_t *bo, + orte_dfs_post_callback_fn_t cbfunc, + void *cbdata); + +/* Get the file map for a process + * + * Returns the file map associated with the specified process name. If + * NULL is provided, then all known process maps will be returned in the + * byte object. It is the responsibility of the caller to unpack it, so + * applications are free to specify whatever constitutes a "file map" that + * suits their purposes + */ +typedef void (*orte_dfs_base_module_get_file_map_fn_t)(orte_process_name_t *target, + orte_dfs_fm_callback_fn_t cbfunc, + void *cbdata); + + +/* Load file maps for a job + */ +typedef void (*orte_dfs_base_module_load_file_maps_fn_t)(orte_jobid_t jobid, + opal_byte_object_t *bo, + orte_dfs_load_callback_fn_t cbfunc, + void *cbdata); + +/* Purge file maps for a job */ +typedef void (*orte_dfs_base_module_purge_file_maps_fn_t)(orte_jobid_t jobid, + orte_dfs_purge_callback_fn_t cbfunc, + void *cbdata); + /* * Module Structure */ struct orte_dfs_base_module_1_0_0_t { /** Initialization Function */ - orte_dfs_base_module_init_fn_t init; + orte_dfs_base_module_init_fn_t init; /** Finalization Function */ - orte_dfs_base_module_finalize_fn_t finalize; + orte_dfs_base_module_finalize_fn_t finalize; - orte_dfs_base_module_open_fn_t open; - orte_dfs_base_module_close_fn_t close; - orte_dfs_base_module_get_file_size_fn_t get_file_size; - orte_dfs_base_module_seek_fn_t seek; - orte_dfs_base_module_read_fn_t read; + orte_dfs_base_module_open_fn_t open; + orte_dfs_base_module_close_fn_t close; + orte_dfs_base_module_get_file_size_fn_t get_file_size; + orte_dfs_base_module_seek_fn_t seek; + orte_dfs_base_module_read_fn_t read; + orte_dfs_base_module_post_file_map_fn_t post_file_map; + orte_dfs_base_module_get_file_map_fn_t get_file_map; + orte_dfs_base_module_load_file_maps_fn_t load_file_maps; + orte_dfs_base_module_purge_file_maps_fn_t purge_file_maps; }; typedef struct orte_dfs_base_module_1_0_0_t orte_dfs_base_module_1_0_0_t; typedef orte_dfs_base_module_1_0_0_t orte_dfs_base_module_t; diff --git a/orte/mca/dfs/dfs_types.h b/orte/mca/dfs/dfs_types.h index e2847bfcd6..2eabc7ee70 100644 --- a/orte/mca/dfs/dfs_types.h +++ b/orte/mca/dfs/dfs_types.h @@ -13,6 +13,8 @@ #include "orte_config.h" +#include "opal/dss/dss_types.h" + BEGIN_C_DECLS typedef uint8_t orte_dfs_cmd_t; @@ -23,6 +25,26 @@ typedef uint8_t orte_dfs_cmd_t; #define ORTE_DFS_SIZE_CMD 3 #define ORTE_DFS_SEEK_CMD 4 #define ORTE_DFS_READ_CMD 5 +#define ORTE_DFS_POST_CMD 6 +#define ORTE_DFS_GETFM_CMD 7 +#define ORTE_DFS_LOAD_CMD 8 +#define ORTE_DFS_PURGE_CMD 9 + +/* file maps */ +typedef struct { + opal_list_item_t super; + orte_jobid_t jobid; + opal_list_t maps; +} orte_dfs_jobfm_t; +ORTE_DECLSPEC OBJ_CLASS_DECLARATION(orte_dfs_jobfm_t); + +typedef struct { + opal_list_item_t super; + orte_vpid_t vpid; + int num_entries; + opal_byte_object_t fm; +} orte_dfs_vpidfm_t; +ORTE_DECLSPEC OBJ_CLASS_DECLARATION(orte_dfs_vpidfm_t); typedef void (*orte_dfs_open_callback_fn_t)(int fd, void *cbdata); @@ -36,6 +58,14 @@ typedef void (*orte_dfs_read_callback_fn_t)(long status, uint8_t *buffer, void *cbdata); +typedef void (*orte_dfs_post_callback_fn_t)(void *cbdata); + +typedef void (*orte_dfs_fm_callback_fn_t)(opal_byte_object_t *bo, void *cbdata); + +typedef void (*orte_dfs_load_callback_fn_t)(void *cbdata); + +typedef void (*orte_dfs_purge_callback_fn_t)(void *cbdata); + END_C_DECLS #endif diff --git a/orte/mca/dfs/orted/dfs_orted.c b/orte/mca/dfs/orted/dfs_orted.c index bdacb3ff9b..a4ed0e32f4 100644 --- a/orte/mca/dfs/orted/dfs_orted.c +++ b/orte/mca/dfs/orted/dfs_orted.c @@ -67,7 +67,19 @@ static void dfs_read(int fd, uint8_t *buffer, long length, orte_dfs_read_callback_fn_t cbfunc, void *cbdata); - +static void dfs_post_file_map(opal_byte_object_t *bo, + orte_dfs_post_callback_fn_t cbfunc, + void *cbdata); +static void dfs_get_file_map(orte_process_name_t *target, + orte_dfs_fm_callback_fn_t cbfunc, + void *cbdata); +static void dfs_load_file_maps(orte_jobid_t jobid, + opal_byte_object_t *bo, + orte_dfs_load_callback_fn_t cbfunc, + void *cbdata); +static void dfs_purge_file_maps(orte_jobid_t jobid, + orte_dfs_purge_callback_fn_t cbfunc, + void *cbdata); /****************** * Daemon/HNP module ******************/ @@ -78,10 +90,14 @@ orte_dfs_base_module_t orte_dfs_orted_module = { dfs_close, dfs_get_file_size, dfs_seek, - dfs_read + dfs_read, + dfs_post_file_map, + dfs_get_file_map, + dfs_load_file_maps, + dfs_purge_file_maps }; -static opal_list_t requests, active_files; +static opal_list_t requests, active_files, file_maps; static int local_fd = 0; static uint64_t req_id = 0; static void recv_dfs_cmd(int status, orte_process_name_t* sender, @@ -97,6 +113,7 @@ static int init(void) OBJ_CONSTRUCT(&requests, opal_list_t); OBJ_CONSTRUCT(&active_files, opal_list_t); + OBJ_CONSTRUCT(&file_maps, opal_list_t); if (ORTE_SUCCESS != (rc = orte_rml.recv_buffer_nb(ORTE_NAME_WILDCARD, ORTE_RML_TAG_DFS_CMD, ORTE_RML_PERSISTENT, @@ -128,6 +145,10 @@ static int finalize(void) OBJ_RELEASE(item); } OBJ_DESTRUCT(&active_files); + while (NULL != (item = opal_list_remove_first(&file_maps))) { + OBJ_RELEASE(item); + } + OBJ_DESTRUCT(&file_maps); return ORTE_SUCCESS; } @@ -794,6 +815,350 @@ static void dfs_read(int fd, uint8_t *buffer, ORTE_DFS_POST_REQUEST(dfs, process_reads); } +static void process_posts(int fd, short args, void *cbdata) +{ + orte_dfs_request_t *dfs = (orte_dfs_request_t*)cbdata; + orte_dfs_jobfm_t *jptr, *jfm; + orte_dfs_vpidfm_t *vptr, *vfm; + opal_list_item_t *item; + int rc; + opal_buffer_t buf; + + opal_output_verbose(1, orte_dfs_base.output, + "%s posting file map for target %s", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), + ORTE_NAME_PRINT(&dfs->target)); + + /* lookup the job map */ + jfm = NULL; + for (item = opal_list_get_first(&file_maps); + item != opal_list_get_end(&file_maps); + item = opal_list_get_next(item)) { + jptr = (orte_dfs_jobfm_t*)item; + if (jptr->jobid == dfs->target.jobid) { + jfm = jptr; + break; + } + } + if (NULL == jfm) { + /* add it */ + jfm = OBJ_NEW(orte_dfs_jobfm_t); + jfm->jobid = dfs->target.jobid; + opal_list_append(&file_maps, &jfm->super); + } + /* see if we already have an entry for this source */ + vfm = NULL; + for (item = opal_list_get_first(&jfm->maps); + item != opal_list_get_end(&jfm->maps); + item = opal_list_get_next(item)) { + vptr = (orte_dfs_vpidfm_t*)item; + if (vptr->vpid == dfs->target.vpid) { + vfm = vptr; + break; + } + } + if (NULL == vfm) { + /* add it */ + vfm = OBJ_NEW(orte_dfs_vpidfm_t); + vfm->vpid = dfs->target.vpid; + opal_list_append(&jfm->maps, &vfm->super); + } + + /* add this entry to any prior one */ + OBJ_CONSTRUCT(&buf, opal_buffer_t); + opal_dss.load(&buf, vfm->fm.bytes, vfm->fm.size); + if (OPAL_SUCCESS != (rc = opal_dss.pack(&buf, &dfs->bo, 1, OPAL_BYTE_OBJECT))) { + ORTE_ERROR_LOG(rc); + goto complete; + } + vfm->num_entries++; + opal_dss.unload(&buf, (void**)&vfm->fm.bytes, &vfm->fm.size); + opal_output_verbose(1, orte_dfs_base.output, + "%s target %s now has %d entries", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), + ORTE_NAME_PRINT(&dfs->target), + vfm->num_entries); + + complete: + if (NULL != dfs->post_cbfunc) { + dfs->post_cbfunc(dfs->cbdata); + } + OBJ_RELEASE(dfs); +} + +static void dfs_post_file_map(opal_byte_object_t *bo, + orte_dfs_post_callback_fn_t cbfunc, + void *cbdata) +{ + orte_dfs_request_t *dfs; + + dfs = OBJ_NEW(orte_dfs_request_t); + dfs->cmd = ORTE_DFS_POST_CMD; + dfs->target.jobid = ORTE_PROC_MY_NAME->jobid; + dfs->target.vpid = ORTE_PROC_MY_NAME->vpid; + dfs->bo = bo; + dfs->post_cbfunc = cbfunc; + dfs->cbdata = cbdata; + + /* post it for processing */ + ORTE_DFS_POST_REQUEST(dfs, process_posts); +} + +static void get_job_maps(orte_dfs_jobfm_t *jfm, + orte_vpid_t vpid, + opal_buffer_t *buf) +{ + orte_dfs_vpidfm_t *vfm; + opal_list_item_t *item; + opal_byte_object_t *boptr; + int rc; + + /* if the target vpid is WILDCARD, then process + * data for all vpids - else, find the one + */ + for (item = opal_list_get_first(&jfm->maps); + item != opal_list_get_end(&jfm->maps); + item = opal_list_get_next(item)) { + vfm = (orte_dfs_vpidfm_t*)item; + if (ORTE_VPID_WILDCARD == vpid || + vfm->vpid == vpid) { + /* indicate data from this vpid */ + if (OPAL_SUCCESS != (rc = opal_dss.pack(buf, &vfm->vpid, 1, ORTE_VPID))) { + ORTE_ERROR_LOG(rc); + return; + } + /* pack the number of entries in its byte object */ + if (OPAL_SUCCESS != (rc = opal_dss.pack(buf, &vfm->num_entries, 1, OPAL_INT32))) { + ORTE_ERROR_LOG(rc); + return; + } + /* pack the byte object */ + boptr = &vfm->fm; + if (OPAL_SUCCESS != (rc = opal_dss.pack(buf, &boptr, 1, OPAL_BYTE_OBJECT))) { + ORTE_ERROR_LOG(rc); + return; + } + } + } +} + +static void process_getfm(int fd, short args, void *cbdata) +{ + orte_dfs_request_t *dfs = (orte_dfs_request_t*)cbdata; + orte_dfs_jobfm_t *jfm; + opal_buffer_t buf; + opal_list_item_t *item; + opal_byte_object_t bo; + + /* prep the collection bucket */ + OBJ_CONSTRUCT(&buf, opal_buffer_t); + + /* if the target job is WILDCARD, then process + * data for all jobids - else, find the one + */ + for (item = opal_list_get_first(&file_maps); + item != opal_list_get_end(&file_maps); + item = opal_list_get_next(item)) { + jfm = (orte_dfs_jobfm_t*)item; + if (ORTE_JOBID_WILDCARD == dfs->target.jobid || + jfm->jobid == dfs->target.jobid) { + get_job_maps(jfm, dfs->target.vpid, &buf); + } + } + + /* unload the result */ + opal_dss.unload(&buf, (void**)&bo.bytes, &bo.size); + OBJ_DESTRUCT(&buf); + + if (NULL != dfs->fm_cbfunc) { + dfs->fm_cbfunc(&bo, dfs->cbdata); + } + if (NULL != bo.bytes) { + free(bo.bytes); + } + OBJ_RELEASE(dfs); +} + +static void dfs_get_file_map(orte_process_name_t *target, + orte_dfs_fm_callback_fn_t cbfunc, + void *cbdata) +{ + orte_dfs_request_t *dfs; + + opal_output_verbose(1, orte_dfs_base.output, + "%s get file map for %s", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), + ORTE_NAME_PRINT(target)); + + dfs = OBJ_NEW(orte_dfs_request_t); + dfs->cmd = ORTE_DFS_GETFM_CMD; + dfs->target.jobid = target->jobid; + dfs->target.vpid = target->vpid; + dfs->fm_cbfunc = cbfunc; + dfs->cbdata = cbdata; + + /* post it for processing */ + ORTE_DFS_POST_REQUEST(dfs, process_getfm); +} + +static void process_load(int fd, short args, void *cbdata) +{ + orte_dfs_request_t *dfs = (orte_dfs_request_t*)cbdata; + opal_list_item_t *item; + orte_dfs_jobfm_t *jfm, *jptr; + orte_dfs_vpidfm_t *vfm; + orte_vpid_t vpid; + int32_t entries; + opal_byte_object_t *boptr; + opal_buffer_t buf; + int cnt; + int rc; + + /* see if we already have a tracker for this job */ + jfm = NULL; + for (item = opal_list_get_first(&file_maps); + item != opal_list_get_end(&file_maps); + item = opal_list_get_next(item)) { + jptr = (orte_dfs_jobfm_t*)item; + if (jptr->jobid == dfs->target.jobid) { + jfm = jptr; + break; + } + } + if (NULL != jfm) { + /* need to purge it first */ + while (NULL != (item = opal_list_remove_first(&jfm->maps))) { + OBJ_RELEASE(item); + } + } else { + jfm = OBJ_NEW(orte_dfs_jobfm_t); + jfm->jobid = dfs->target.jobid; + opal_list_append(&file_maps, &jfm->super); + } + + /* setup to unpack the byte object */ + OBJ_CONSTRUCT(&buf, opal_buffer_t); + opal_dss.load(&buf, dfs->bo->bytes, dfs->bo->size); + + /* unpack the buffer */ + cnt = 1; + while (OPAL_SUCCESS == opal_dss.unpack(&buf, &vpid, &cnt, ORTE_VPID)) { + /* unpack the number of entries contained in the byte object */ + cnt = 1; + if (OPAL_SUCCESS != (rc = opal_dss.unpack(&buf, &entries, &cnt, OPAL_INT32))) { + ORTE_ERROR_LOG(rc); + goto complete; + } + /* unpack the byte object */ + cnt = 1; + if (OPAL_SUCCESS != (rc = opal_dss.unpack(&buf, &boptr, &cnt, OPAL_BYTE_OBJECT))) { + ORTE_ERROR_LOG(rc); + goto complete; + } + /* create the entry */ + vfm = OBJ_NEW(orte_dfs_vpidfm_t); + vfm->vpid = vpid; + vfm->num_entries = entries; + vfm->fm.bytes = boptr->bytes; + boptr->bytes = NULL; + vfm->fm.size = boptr->size; + opal_list_append(&jfm->maps, &vfm->super); + /* cleanup */ + free(boptr); + } + + complete: + /* must reload the byte object as it belongs to + * our original caller - the load function will + * have cleared it + */ + opal_dss.unload(&buf, (void**)&dfs->bo->bytes, &dfs->bo->size); + OBJ_DESTRUCT(&buf); + + if (NULL != dfs->load_cbfunc) { + dfs->load_cbfunc(dfs->cbdata); + } + OBJ_RELEASE(dfs); +} + +static void dfs_load_file_maps(orte_jobid_t jobid, + opal_byte_object_t *bo, + orte_dfs_load_callback_fn_t cbfunc, + void *cbdata) +{ + orte_dfs_request_t *dfs; + + opal_output_verbose(1, orte_dfs_base.output, + "%s loading file maps for %s", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), + ORTE_JOBID_PRINT(jobid)); + + dfs = OBJ_NEW(orte_dfs_request_t); + dfs->cmd = ORTE_DFS_LOAD_CMD; + dfs->target.jobid = jobid; + dfs->bo = bo; + dfs->load_cbfunc = cbfunc; + dfs->cbdata = cbdata; + + /* post it for processing */ + ORTE_DFS_POST_REQUEST(dfs, process_load); +} + +static void process_purge(int fd, short args, void *cbdata) +{ + orte_dfs_request_t *dfs = (orte_dfs_request_t*)cbdata; + opal_list_item_t *item; + orte_dfs_jobfm_t *jfm, *jptr; + + /* find the job tracker */ + jfm = NULL; + for (item = opal_list_get_first(&file_maps); + item != opal_list_get_end(&file_maps); + item = opal_list_get_next(item)) { + jptr = (orte_dfs_jobfm_t*)item; + if (jptr->jobid == dfs->target.jobid) { + jfm = jptr; + break; + } + } + if (NULL == jptr) { + ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND); + } else { + /* remove it from the list */ + opal_list_remove_item(&file_maps, &jptr->super); + /* the destructor will release the list of maps + * in the jobfm object + */ + OBJ_RELEASE(jptr); + } + + if (NULL != dfs->purge_cbfunc) { + dfs->purge_cbfunc(dfs->cbdata); + } + OBJ_RELEASE(dfs); +} + +static void dfs_purge_file_maps(orte_jobid_t jobid, + orte_dfs_purge_callback_fn_t cbfunc, + void *cbdata) +{ + orte_dfs_request_t *dfs; + + opal_output_verbose(1, orte_dfs_base.output, + "%s purging file maps for job %s", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), + ORTE_JOBID_PRINT(jobid)); + + dfs = OBJ_NEW(orte_dfs_request_t); + dfs->cmd = ORTE_DFS_PURGE_CMD; + dfs->target.jobid = jobid; + dfs->purge_cbfunc = cbfunc; + dfs->cbdata = cbdata; + + /* post it for processing */ + ORTE_DFS_POST_REQUEST(dfs, process_purge); +} + /* receives take place in an event, so we are free to process * the request list without fear of getting things out-of-order @@ -813,6 +1178,11 @@ static void recv_dfs_cmd(int status, orte_process_name_t* sender, uint64_t rid; int whence; struct stat buf; + orte_process_name_t source; + opal_byte_object_t *bo; + orte_dfs_request_t *dfs; + orte_dfs_jobfm_t *jfm; + opal_buffer_t *answer, bucket; /* unpack the command */ cnt = 1; @@ -860,25 +1230,25 @@ static void recv_dfs_cmd(int status, orte_process_name_t* sender, opal_list_append(&active_files, &trk->super); answer_open: /* construct the return message */ - buffer = OBJ_NEW(opal_buffer_t); - if (OPAL_SUCCESS != (rc = opal_dss.pack(buffer, &cmd, 1, ORTE_DFS_CMD_T))) { + answer = OBJ_NEW(opal_buffer_t); + if (OPAL_SUCCESS != (rc = opal_dss.pack(answer, &cmd, 1, ORTE_DFS_CMD_T))) { ORTE_ERROR_LOG(rc); return; } - if (OPAL_SUCCESS != (rc = opal_dss.pack(buffer, &rid, 1, OPAL_UINT64))) { + if (OPAL_SUCCESS != (rc = opal_dss.pack(answer, &rid, 1, OPAL_UINT64))) { ORTE_ERROR_LOG(rc); return; } - if (OPAL_SUCCESS != (rc = opal_dss.pack(buffer, &my_fd, 1, OPAL_INT))) { + if (OPAL_SUCCESS != (rc = opal_dss.pack(answer, &my_fd, 1, OPAL_INT))) { ORTE_ERROR_LOG(rc); return; } /* send it */ - if (0 > (rc = orte_rml.send_buffer_nb(sender, buffer, + if (0 > (rc = orte_rml.send_buffer_nb(sender, answer, ORTE_RML_TAG_DFS_DATA, 0, orte_rml_send_callback, NULL))) { ORTE_ERROR_LOG(rc); - OBJ_RELEASE(buffer); + OBJ_RELEASE(answer); return; } break; @@ -940,25 +1310,25 @@ static void recv_dfs_cmd(int status, orte_process_name_t* sender, } } /* construct the return message */ - buffer = OBJ_NEW(opal_buffer_t); - if (OPAL_SUCCESS != (rc = opal_dss.pack(buffer, &cmd, 1, ORTE_DFS_CMD_T))) { + answer = OBJ_NEW(opal_buffer_t); + if (OPAL_SUCCESS != (rc = opal_dss.pack(answer, &cmd, 1, ORTE_DFS_CMD_T))) { ORTE_ERROR_LOG(rc); return; } - if (OPAL_SUCCESS != (rc = opal_dss.pack(buffer, &rid, 1, OPAL_UINT64))) { + if (OPAL_SUCCESS != (rc = opal_dss.pack(answer, &rid, 1, OPAL_UINT64))) { ORTE_ERROR_LOG(rc); return; } - if (OPAL_SUCCESS != (rc = opal_dss.pack(buffer, &i64, 1, OPAL_INT64))) { + if (OPAL_SUCCESS != (rc = opal_dss.pack(answer, &i64, 1, OPAL_INT64))) { ORTE_ERROR_LOG(rc); return; } /* send it */ - if (0 > (rc = orte_rml.send_buffer_nb(sender, buffer, + if (0 > (rc = orte_rml.send_buffer_nb(sender, answer, ORTE_RML_TAG_DFS_DATA, 0, orte_rml_send_callback, NULL))) { ORTE_ERROR_LOG(rc); - OBJ_RELEASE(buffer); + OBJ_RELEASE(answer); return; } break; @@ -1033,17 +1403,17 @@ static void recv_dfs_cmd(int status, orte_process_name_t* sender, } } /* construct the return message */ - buffer = OBJ_NEW(opal_buffer_t); - if (OPAL_SUCCESS != (rc = opal_dss.pack(buffer, &cmd, 1, ORTE_DFS_CMD_T))) { + answer = OBJ_NEW(opal_buffer_t); + if (OPAL_SUCCESS != (rc = opal_dss.pack(answer, &cmd, 1, ORTE_DFS_CMD_T))) { ORTE_ERROR_LOG(rc); return; } - if (OPAL_SUCCESS != (rc = opal_dss.pack(buffer, &rid, 1, OPAL_UINT64))) { + if (OPAL_SUCCESS != (rc = opal_dss.pack(answer, &rid, 1, OPAL_UINT64))) { ORTE_ERROR_LOG(rc); return; } /* return the offset/status */ - if (OPAL_SUCCESS != (rc = opal_dss.pack(buffer, &bytes_read, 1, OPAL_INT64))) { + if (OPAL_SUCCESS != (rc = opal_dss.pack(answer, &bytes_read, 1, OPAL_INT64))) { ORTE_ERROR_LOG(rc); return; } @@ -1053,11 +1423,11 @@ static void recv_dfs_cmd(int status, orte_process_name_t* sender, ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), (long)bytes_read, ORTE_NAME_PRINT(sender)); - if (0 > (rc = orte_rml.send_buffer_nb(sender, buffer, + if (0 > (rc = orte_rml.send_buffer_nb(sender, answer, ORTE_RML_TAG_DFS_DATA, 0, orte_rml_send_callback, NULL))) { ORTE_ERROR_LOG(rc); - OBJ_RELEASE(buffer); + OBJ_RELEASE(answer); return; } break; @@ -1113,23 +1483,23 @@ static void recv_dfs_cmd(int status, orte_process_name_t* sender, } answer_read: /* construct the return message */ - buffer = OBJ_NEW(opal_buffer_t); - if (OPAL_SUCCESS != (rc = opal_dss.pack(buffer, &cmd, 1, ORTE_DFS_CMD_T))) { + answer = OBJ_NEW(opal_buffer_t); + if (OPAL_SUCCESS != (rc = opal_dss.pack(answer, &cmd, 1, ORTE_DFS_CMD_T))) { ORTE_ERROR_LOG(rc); return; } - if (OPAL_SUCCESS != (rc = opal_dss.pack(buffer, &rid, 1, OPAL_UINT64))) { + if (OPAL_SUCCESS != (rc = opal_dss.pack(answer, &rid, 1, OPAL_UINT64))) { ORTE_ERROR_LOG(rc); return; } /* include the number of bytes read */ - if (OPAL_SUCCESS != (rc = opal_dss.pack(buffer, &bytes_read, 1, OPAL_INT64))) { + if (OPAL_SUCCESS != (rc = opal_dss.pack(answer, &bytes_read, 1, OPAL_INT64))) { ORTE_ERROR_LOG(rc); return; } /* include the bytes read */ if (0 < bytes_read) { - if (OPAL_SUCCESS != (rc = opal_dss.pack(buffer, read_buf, bytes_read, OPAL_UINT8))) { + if (OPAL_SUCCESS != (rc = opal_dss.pack(answer, read_buf, bytes_read, OPAL_UINT8))) { ORTE_ERROR_LOG(rc); return; } @@ -1140,11 +1510,11 @@ static void recv_dfs_cmd(int status, orte_process_name_t* sender, ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), (long)bytes_read, ORTE_NAME_PRINT(sender)); - if (0 > (rc = orte_rml.send_buffer_nb(sender, buffer, + if (0 > (rc = orte_rml.send_buffer_nb(sender, answer, ORTE_RML_TAG_DFS_DATA, 0, orte_rml_send_callback, NULL))) { ORTE_ERROR_LOG(rc); - OBJ_RELEASE(buffer); + OBJ_RELEASE(answer); return; } if (NULL != read_buf) { @@ -1152,6 +1522,117 @@ static void recv_dfs_cmd(int status, orte_process_name_t* sender, } break; + case ORTE_DFS_POST_CMD: + /* unpack their request id */ + cnt = 1; + if (OPAL_SUCCESS != (rc = opal_dss.unpack(buffer, &rid, &cnt, OPAL_UINT64))) { + ORTE_ERROR_LOG(rc); + return; + } + /* unpack the name of the source of this data */ + cnt = 1; + if (OPAL_SUCCESS != (rc = opal_dss.unpack(buffer, &source, &cnt, ORTE_NAME))) { + ORTE_ERROR_LOG(rc); + return; + } + /* unpack their byte object */ + cnt = 1; + if (OPAL_SUCCESS != (rc = opal_dss.unpack(buffer, &bo, &cnt, OPAL_BYTE_OBJECT))) { + ORTE_ERROR_LOG(rc); + goto answer_read; + } + /* add the contents to the storage for this process */ + dfs = OBJ_NEW(orte_dfs_request_t); + dfs->target.jobid = source.jobid; + dfs->target.vpid = source.vpid; + dfs->bo = bo; + dfs->post_cbfunc = NULL; + process_posts(0, 0, (void*)dfs); + /* return an ack */ + answer = OBJ_NEW(opal_buffer_t); + if (OPAL_SUCCESS != (rc = opal_dss.pack(answer, &cmd, 1, ORTE_DFS_CMD_T))) { + ORTE_ERROR_LOG(rc); + return; + } + if (OPAL_SUCCESS != (rc = opal_dss.pack(answer, &rid, 1, OPAL_UINT64))) { + ORTE_ERROR_LOG(rc); + return; + } + if (0 > (rc = orte_rml.send_buffer_nb(sender, answer, + ORTE_RML_TAG_DFS_DATA, 0, + orte_rml_send_callback, NULL))) { + ORTE_ERROR_LOG(rc); + OBJ_RELEASE(answer); + } + if (NULL != bo->bytes) { + free(bo->bytes); + } + free(bo); + break; + + case ORTE_DFS_GETFM_CMD: + /* unpack their request id */ + cnt = 1; + if (OPAL_SUCCESS != (rc = opal_dss.unpack(buffer, &rid, &cnt, OPAL_UINT64))) { + ORTE_ERROR_LOG(rc); + return; + } + /* unpack the target */ + cnt = 1; + if (OPAL_SUCCESS != (rc = opal_dss.unpack(buffer, &source, &cnt, ORTE_NAME))) { + ORTE_ERROR_LOG(rc); + goto answer_read; + } + /* search our data tree for matches, assembling them + * into a byte object + */ + /* if the target job is WILDCARD, then process + * data for all jobids - else, find the one + */ + OBJ_CONSTRUCT(&bucket, opal_buffer_t); + for (item = opal_list_get_first(&file_maps); + item != opal_list_get_end(&file_maps); + item = opal_list_get_next(item)) { + jfm = (orte_dfs_jobfm_t*)item; + if (ORTE_JOBID_WILDCARD == source.jobid || + jfm->jobid == source.jobid) { + get_job_maps(jfm, source.vpid, &bucket); + } + } + /* unload the result */ + bo = (opal_byte_object_t*)malloc(sizeof(opal_byte_object_t)); + opal_dss.unload(&bucket, (void**)&bo->bytes, &bo->size); + OBJ_DESTRUCT(&bucket); + opal_output_verbose(1, orte_dfs_base.output, + "%s getf-cmd: returning %d bytes to sender %s", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), bo->size, + ORTE_NAME_PRINT(sender)); + /* construct the response */ + answer = OBJ_NEW(opal_buffer_t); + if (OPAL_SUCCESS != (rc = opal_dss.pack(answer, &cmd, 1, ORTE_DFS_CMD_T))) { + ORTE_ERROR_LOG(rc); + return; + } + if (OPAL_SUCCESS != (rc = opal_dss.pack(answer, &rid, 1, OPAL_UINT64))) { + ORTE_ERROR_LOG(rc); + return; + } + if (OPAL_SUCCESS != (rc = opal_dss.pack(answer, &bo, 1, OPAL_BYTE_OBJECT))) { + ORTE_ERROR_LOG(rc); + return; + } + if (NULL != bo->bytes) { + free(bo->bytes); + } + free(bo); + if (0 > (rc = orte_rml.send_buffer_nb(sender, answer, + ORTE_RML_TAG_DFS_DATA, 0, + orte_rml_send_callback, NULL))) { + ORTE_ERROR_LOG(rc); + OBJ_RELEASE(answer); + } + break; + default: opal_output(0, "ORTED:DFS:RECV_DFS WTF"); break;