1
1

Update DFS to support multi-node operations

This commit was SVN r27594.
Этот коммит содержится в:
Ralph Castain 2012-11-12 02:54:53 +00:00
родитель fefec03e78
Коммит fe6dfad625
7 изменённых файлов: 249 добавлений и 116 удалений

Просмотреть файл

@ -214,7 +214,10 @@ static void recv_dfs(int status, orte_process_name_t* sender,
trk->requestor.vpid = ORTE_PROC_MY_NAME->vpid;
trk->host_daemon.jobid = sender->jobid;
trk->host_daemon.vpid = sender->vpid;
trk->filename = strdup(dfs->uri);
trk->uri = strdup(dfs->uri);
/* break the uri down into scheme and filename */
trk->scheme = opal_uri_get_scheme(dfs->uri);
trk->filename = opal_filename_from_uri(dfs->uri, NULL);
/* define the local fd */
trk->local_fd = local_fd++;
/* record the remote file descriptor */
@ -472,6 +475,9 @@ static void open_local_file(orte_dfs_request_t *dfs)
trk = OBJ_NEW(orte_dfs_tracker_t);
trk->requestor.jobid = ORTE_PROC_MY_NAME->jobid;
trk->requestor.vpid = ORTE_PROC_MY_NAME->vpid;
trk->uri = strdup(dfs->uri);
/* break the uri down into scheme and filename */
trk->scheme = opal_uri_get_scheme(dfs->uri);
trk->filename = strdup(filename);
/* define the local fd */
trk->local_fd = local_fd++;
@ -503,13 +509,11 @@ static void process_opens(int fd, short args, void *cbdata)
bool found;
orte_vpid_t v;
opal_output(0, "%s PROCESSING OPEN", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME));
/* get the scheme to determine if we can process locally or not */
if (NULL == (scheme = opal_uri_get_scheme(dfs->uri))) {
ORTE_ERROR_LOG(ORTE_ERR_BAD_PARAM);
goto complete;
}
opal_output(0, "%s GOT SCHEME", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME));
if (0 == strcmp(scheme, "nfs")) {
open_local_file(dfs);
@ -529,12 +533,10 @@ static void process_opens(int fd, short args, void *cbdata)
if (NULL == (filename = opal_filename_from_uri(dfs->uri, &host))) {
goto complete;
}
opal_output(0, "%s GOT FILENAME %s", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), filename);
if (NULL == host) {
host = strdup(orte_process_info.nodename);
}
#if 0
/* if the host is our own, then treat it as a local file */
if (NULL == host ||
0 == strcmp(host, orte_process_info.nodename) ||
@ -549,7 +551,6 @@ static void process_opens(int fd, short args, void *cbdata)
OBJ_RELEASE(dfs);
return;
}
#endif
/* ident the daemon on that host */
daemon.jobid = ORTE_PROC_MY_DAEMON->jobid;
@ -575,7 +576,7 @@ static void process_opens(int fd, short args, void *cbdata)
"%s file %s on host %s daemon %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
filename, host, ORTE_NAME_PRINT(&daemon));
#if 0
/* double-check: if it is our local daemon, then we
* treat this as local
*/
@ -589,7 +590,7 @@ static void process_opens(int fd, short args, void *cbdata)
OBJ_RELEASE(dfs);
return;
}
#endif
/* add this request to our local list so we can
* match it with the returned response when it comes
*/

Просмотреть файл

@ -49,7 +49,9 @@ typedef struct {
opal_list_item_t super;
orte_process_name_t requestor;
orte_process_name_t host_daemon;
char *filename; /* for debug purposes */
char *uri;
char *scheme;
char *filename;
int local_fd;
int remote_fd;
size_t location;

Просмотреть файл

@ -92,11 +92,19 @@ static void trk_con(orte_dfs_tracker_t *trk)
{
trk->host_daemon.jobid = ORTE_JOBID_INVALID;
trk->host_daemon.vpid = ORTE_VPID_INVALID;
trk->uri = NULL;
trk->scheme = NULL;
trk->filename = NULL;
trk->location = 0;
}
static void trk_des(orte_dfs_tracker_t *trk)
{
if (NULL != trk->uri) {
free(trk->uri);
}
if (NULL != trk->scheme) {
free(trk->scheme);
}
if (NULL != trk->filename) {
free(trk->filename);
}

Просмотреть файл

@ -21,15 +21,16 @@ BEGIN_C_DECLS
typedef uint8_t orte_dfs_cmd_t;
#define ORTE_DFS_CMD_T OPAL_UINT8
#define ORTE_DFS_OPEN_CMD 1
#define ORTE_DFS_CLOSE_CMD 2
#define ORTE_DFS_SIZE_CMD 3
#define ORTE_DFS_SEEK_CMD 4
#define ORTE_DFS_READ_CMD 5
#define ORTE_DFS_POST_CMD 6
#define ORTE_DFS_GETFM_CMD 7
#define ORTE_DFS_LOAD_CMD 8
#define ORTE_DFS_PURGE_CMD 9
#define ORTE_DFS_OPEN_CMD 1
#define ORTE_DFS_CLOSE_CMD 2
#define ORTE_DFS_SIZE_CMD 3
#define ORTE_DFS_SEEK_CMD 4
#define ORTE_DFS_READ_CMD 5
#define ORTE_DFS_POST_CMD 6
#define ORTE_DFS_GETFM_CMD 7
#define ORTE_DFS_LOAD_CMD 8
#define ORTE_DFS_PURGE_CMD 9
#define ORTE_DFS_RELAY_POSTS_CMD 10
/* file maps */
typedef struct {

Просмотреть файл

@ -949,6 +949,7 @@ static void process_getfm(int fd, short args, void *cbdata)
* data for all jobids - else, find the one
*/
ntotal = 0;
n = -1;
for (item = opal_list_get_first(&file_maps);
item != opal_list_get_end(&file_maps);
item = opal_list_get_next(item)) {
@ -1051,6 +1052,10 @@ static void process_load(int fd, short args, void *cbdata)
goto complete;
}
opal_output_verbose(1, orte_dfs_base.output,
"%s loading file maps from %d vpids",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), nvpids);
/* unpack the buffer */
for (i=0; i < nvpids; i++) {
/* unpack this vpid */
@ -1136,15 +1141,15 @@ static void process_purge(int fd, short args, void *cbdata)
break;
}
}
if (NULL == jptr) {
if (NULL == jfm) {
ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND);
} else {
/* remove it from the list */
opal_list_remove_item(&file_maps, &jptr->super);
opal_list_remove_item(&file_maps, &jfm->super);
/* the destructor will release the list of maps
* in the jobfm object
*/
OBJ_RELEASE(jptr);
OBJ_RELEASE(jfm);
}
if (NULL != dfs->purge_cbfunc) {
@ -1195,10 +1200,14 @@ static void recv_dfs_cmd(int status, orte_process_name_t* sender,
int whence;
struct stat buf;
orte_process_name_t source;
opal_buffer_t *bptr;
opal_buffer_t *bptr, *xfer;
orte_dfs_request_t *dfs;
orte_dfs_jobfm_t *jfm;
orte_dfs_jobfm_t *jfm, *jptr;
orte_dfs_vpidfm_t *vfm, *vptr;
opal_buffer_t *answer, bucket;
int i, j;
orte_vpid_t vpid;
int32_t nentries, ncontributors;
/* unpack the command */
cnt = 1;
@ -1539,6 +1548,10 @@ static void recv_dfs_cmd(int status, orte_process_name_t* sender,
break;
case ORTE_DFS_POST_CMD:
opal_output_verbose(1, orte_dfs_base.output,
"%s received post command from %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(sender));
/* unpack their request id */
cnt = 1;
if (OPAL_SUCCESS != (rc = opal_dss.unpack(buffer, &rid, &cnt, OPAL_UINT64))) {
@ -1566,22 +1579,120 @@ static void recv_dfs_cmd(int status, orte_process_name_t* sender,
process_posts(0, 0, (void*)dfs);
OBJ_RELEASE(bptr);
answer_post:
/* return an ack */
answer = OBJ_NEW(opal_buffer_t);
if (OPAL_SUCCESS != (rc = opal_dss.pack(answer, &cmd, 1, ORTE_DFS_CMD_T))) {
if (UINT64_MAX != rid) {
/* return an ack */
answer = OBJ_NEW(opal_buffer_t);
if (OPAL_SUCCESS != (rc = opal_dss.pack(answer, &cmd, 1, ORTE_DFS_CMD_T))) {
ORTE_ERROR_LOG(rc);
return;
}
if (OPAL_SUCCESS != (rc = opal_dss.pack(answer, &rid, 1, OPAL_UINT64))) {
ORTE_ERROR_LOG(rc);
return;
}
if (0 > (rc = orte_rml.send_buffer_nb(sender, answer,
ORTE_RML_TAG_DFS_DATA, 0,
orte_rml_send_callback, NULL))) {
ORTE_ERROR_LOG(rc);
OBJ_RELEASE(answer);
}
}
break;
case ORTE_DFS_RELAY_POSTS_CMD:
/* unpack the name of the source of this data */
cnt = 1;
if (OPAL_SUCCESS != (rc = opal_dss.unpack(buffer, &source, &cnt, ORTE_NAME))) {
ORTE_ERROR_LOG(rc);
return;
}
if (OPAL_SUCCESS != (rc = opal_dss.pack(answer, &rid, 1, OPAL_UINT64))) {
opal_output_verbose(1, orte_dfs_base.output,
"%s received relayed posts from sender %s for source %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(sender),
ORTE_NAME_PRINT(&source));
/* lookup the job map */
jfm = NULL;
for (item = opal_list_get_first(&file_maps);
item != opal_list_get_end(&file_maps);
item = opal_list_get_next(item)) {
jptr = (orte_dfs_jobfm_t*)item;
if (jptr->jobid == source.jobid) {
jfm = jptr;
break;
}
}
if (NULL == jfm) {
/* add it */
jfm = OBJ_NEW(orte_dfs_jobfm_t);
jfm->jobid = source.jobid;
opal_list_append(&file_maps, &jfm->super);
}
/* see if we already have an entry for this source */
vfm = NULL;
for (item = opal_list_get_first(&jfm->maps);
item != opal_list_get_end(&jfm->maps);
item = opal_list_get_next(item)) {
vptr = (orte_dfs_vpidfm_t*)item;
if (vptr->vpid == source.vpid) {
vfm = vptr;
break;
}
}
if (NULL == vfm) {
/* add it */
vfm = OBJ_NEW(orte_dfs_vpidfm_t);
vfm->vpid = source.vpid;
opal_list_append(&jfm->maps, &vfm->super);
}
/* unpack their buffer object */
cnt = 1;
if (OPAL_SUCCESS != (rc = opal_dss.unpack(buffer, &bptr, &cnt, OPAL_BUFFER))) {
ORTE_ERROR_LOG(rc);
return;
}
if (0 > (rc = orte_rml.send_buffer_nb(sender, answer,
ORTE_RML_TAG_DFS_DATA, 0,
orte_rml_send_callback, NULL))) {
/* the buffer object came from a call to get_file_maps, so it isn't quite
* the same as when someone posts directly to us. So process it here by
* starting with getting the number of vpids that contributed. This
* should always be one, but leave it open for flexibility
*/
cnt = 1;
if (OPAL_SUCCESS != (rc = opal_dss.unpack(bptr, &ncontributors, &cnt, OPAL_INT32))) {
ORTE_ERROR_LOG(rc);
OBJ_RELEASE(answer);
return;
}
/* loop thru the number of contributors */
for (i=0; i < ncontributors; i++) {
/* unpack the vpid of the contributor */
cnt = 1;
if (OPAL_SUCCESS != (rc = opal_dss.unpack(bptr, &vpid, &cnt, ORTE_VPID))) {
ORTE_ERROR_LOG(rc);
return;
}
/* unpack the number of entries */
cnt = 1;
if (OPAL_SUCCESS != (rc = opal_dss.unpack(bptr, &nentries, &cnt, OPAL_INT32))) {
ORTE_ERROR_LOG(rc);
return;
}
for (j=0; j < nentries; j++) {
/* get the entry */
cnt = 1;
if (OPAL_SUCCESS != (rc = opal_dss.unpack(bptr, &xfer, &cnt, OPAL_BUFFER))) {
ORTE_ERROR_LOG(rc);
return;
}
/* store it */
if (OPAL_SUCCESS != (rc = opal_dss.pack(&vfm->data, &xfer, 1, OPAL_BUFFER))) {
ORTE_ERROR_LOG(rc);
return;
}
OBJ_RELEASE(xfer);
vfm->num_entries++;
}
}
OBJ_RELEASE(bptr);
/* no reply required */
break;
case ORTE_DFS_GETFM_CMD:
@ -1645,9 +1756,9 @@ static void recv_dfs_cmd(int status, orte_process_name_t* sender,
}
OBJ_DESTRUCT(&bucket);
opal_output_verbose(1, orte_dfs_base.output,
"%s getf-cmd: returning %d maps to sender %s",
"%s getf-cmd: returning %d maps with %d bytes to sender %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), nmaps,
ORTE_NAME_PRINT(sender));
(int)answer->bytes_used, ORTE_NAME_PRINT(sender));
if (0 > (rc = orte_rml.send_buffer_nb(sender, answer,
ORTE_RML_TAG_DFS_DATA, 0,
orte_rml_send_callback, NULL))) {

Просмотреть файл

@ -20,6 +20,7 @@
#include "opal/util/output.h"
#include "orte/mca/dfs/dfs.h"
#include "orte/mca/errmgr/errmgr.h"
#include "orte/mca/iof/iof.h"
#include "orte/mca/rml/rml.h"
@ -158,17 +159,98 @@ static void track_jobs(int fd, short argc, void *cbdata)
OBJ_RELEASE(caddy);
}
static void send_fms(opal_buffer_t *bptr, void *cbdata)
{
orte_proc_t *pdata = (orte_proc_t*)cbdata;
orte_proc_t *pptr;
orte_job_t *jdata;
opal_buffer_t *xfer, *alert;
orte_dfs_cmd_t cmd = ORTE_DFS_RELAY_POSTS_CMD;
int rc, i;
orte_plm_cmd_flag_t cmd2;
opal_output(0, "%s SENDING FILE MAPS FOR %s OF SIZE %d",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(&pdata->name), (int)bptr->bytes_used);
xfer = OBJ_NEW(opal_buffer_t);
if (OPAL_SUCCESS != (rc = opal_dss.pack(xfer, &cmd, 1, ORTE_DFS_CMD_T))) {
ORTE_ERROR_LOG(rc);
OBJ_RELEASE(xfer);
return;
}
if (OPAL_SUCCESS != (rc = opal_dss.pack(xfer, &pdata->name, 1, ORTE_NAME))) {
ORTE_ERROR_LOG(rc);
OBJ_RELEASE(xfer);
return;
}
if (OPAL_SUCCESS != (rc = opal_dss.pack(xfer, &bptr, 1, OPAL_BUFFER))) {
ORTE_ERROR_LOG(rc);
OBJ_RELEASE(xfer);
return;
}
if (0 > (rc = orte_rml.send_buffer_nb(ORTE_PROC_MY_HNP, xfer,
ORTE_RML_TAG_DFS_CMD, 0,
orte_rml_send_callback, NULL))) {
ORTE_ERROR_LOG(rc);
OBJ_RELEASE(xfer);
return;
}
/* Clean up the session directory as if we were the process
* itself. This covers the case where the process died abnormally
* and didn't cleanup its own session directory.
*/
orte_session_dir_finalize(&pdata->name);
/* alert the HNP */
cmd2 = ORTE_PLM_UPDATE_PROC_STATE;
alert = OBJ_NEW(opal_buffer_t);
if (ORTE_SUCCESS != (rc = opal_dss.pack(alert, &cmd2, 1, ORTE_PLM_CMD))) {
ORTE_ERROR_LOG(rc);
OBJ_RELEASE(alert);
return;
}
/* get the job object for this proc */
if (NULL == (jdata = orte_get_job_data_object(pdata->name.jobid))) {
ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND);
return;
}
/* pack the info */
if (ORTE_SUCCESS != (rc = pack_state_update(alert, jdata, pdata))) {
ORTE_ERROR_LOG(rc);
}
/* send it */
OPAL_OUTPUT_VERBOSE((5, orte_state_base_output,
"%s SENDING TERMINATION UPDATE FOR PROC %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(&pdata->name)));
if (0 > (rc = orte_rml.send_buffer_nb(ORTE_PROC_MY_HNP, alert,
ORTE_RML_TAG_PLM, 0,
orte_rml_send_callback, NULL))) {
ORTE_ERROR_LOG(rc);
}
/* find this proc in the children array and remove it so
* we don't keep telling the HNP that it died
*/
for (i=0; i < orte_local_children->size; i++) {
if (NULL == (pptr = (orte_proc_t*)opal_pointer_array_get_item(orte_local_children, i))) {
continue;
}
if (pptr == pdata) {
opal_pointer_array_set_item(orte_local_children, i, NULL);
OBJ_RELEASE(pdata);
break;
}
}
}
static void track_procs(int fd, short argc, void *cbdata)
{
orte_state_caddy_t *caddy = (orte_state_caddy_t*)cbdata;
orte_process_name_t *proc = &caddy->name;
orte_proc_state_t state = caddy->proc_state;
orte_job_t *jdata;
orte_proc_t *pdata, *pptr;
opal_buffer_t *alert;
int rc;
orte_plm_cmd_flag_t cmd;
int i;
orte_proc_t *pdata;
OPAL_OUTPUT_VERBOSE((5, orte_state_base_output,
"%s state:staged_orted:track_procs called for proc %s state %s",
@ -213,45 +295,10 @@ static void track_procs(int fd, short argc, void *cbdata)
/* the proc has terminated */
pdata->alive = false;
pdata->state = ORTE_PROC_STATE_TERMINATED;
/* Clean up the session directory as if we were the process
* itself. This covers the case where the process died abnormally
* and didn't cleanup its own session directory.
/* retrieve any file maps posted by this process and forward them
* to the HNP for collection
*/
orte_session_dir_finalize(proc);
/* alert the HNP */
cmd = ORTE_PLM_UPDATE_PROC_STATE;
alert = OBJ_NEW(opal_buffer_t);
if (ORTE_SUCCESS != (rc = opal_dss.pack(alert, &cmd, 1, ORTE_PLM_CMD))) {
ORTE_ERROR_LOG(rc);
goto cleanup;
}
/* pack the info */
if (ORTE_SUCCESS != (rc = pack_state_update(alert, jdata, pdata))) {
ORTE_ERROR_LOG(rc);
}
/* send it */
OPAL_OUTPUT_VERBOSE((5, orte_state_base_output,
"%s SENDING TERMINATION UPDATE FOR PROC %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(&pdata->name)));
if (0 > (rc = orte_rml.send_buffer_nb(ORTE_PROC_MY_HNP, alert,
ORTE_RML_TAG_PLM, 0,
orte_rml_send_callback, NULL))) {
ORTE_ERROR_LOG(rc);
}
/* find this proc in the children array and remove it so
* we don't keep telling the HNP that it died
*/
for (i=0; i < orte_local_children->size; i++) {
if (NULL == (pptr = (orte_proc_t*)opal_pointer_array_get_item(orte_local_children, i))) {
continue;
}
if (pptr == pdata) {
opal_pointer_array_set_item(orte_local_children, i, NULL);
OBJ_RELEASE(pdata);
break;
}
}
orte_dfs.get_file_map(proc, send_fms, pdata);
}
/* Release the stdin IOF file descriptor for this child, if one
* was defined. File descriptors for the other IOF channels - stdout,
@ -275,45 +322,10 @@ static void track_procs(int fd, short argc, void *cbdata)
/* the proc has terminated */
pdata->alive = false;
pdata->state = ORTE_PROC_STATE_TERMINATED;
/* Clean up the session directory as if we were the process
* itself. This covers the case where the process died abnormally
* and didn't cleanup its own session directory.
/* retrieve any file maps posted by this process and forward them
* to the HNP for collection
*/
orte_session_dir_finalize(proc);
/* alert the HNP */
cmd = ORTE_PLM_UPDATE_PROC_STATE;
alert = OBJ_NEW(opal_buffer_t);
if (ORTE_SUCCESS != (rc = opal_dss.pack(alert, &cmd, 1, ORTE_PLM_CMD))) {
ORTE_ERROR_LOG(rc);
goto cleanup;
}
/* pack the info */
if (ORTE_SUCCESS != (rc = pack_state_update(alert, jdata, pdata))) {
ORTE_ERROR_LOG(rc);
}
/* send it */
OPAL_OUTPUT_VERBOSE((5, orte_state_base_output,
"%s SENDING TERMINATION UPDATE FOR PROC %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(&pdata->name)));
if (0 > (rc = orte_rml.send_buffer_nb(ORTE_PROC_MY_HNP, alert,
ORTE_RML_TAG_PLM, 0,
orte_rml_send_callback, NULL))) {
ORTE_ERROR_LOG(rc);
}
/* find this proc in the children array and remove it so
* we don't keep telling the HNP that it died
*/
for (i=0; i < orte_local_children->size; i++) {
if (NULL == (pptr = (orte_proc_t*)opal_pointer_array_get_item(orte_local_children, i))) {
continue;
}
if (pptr == pdata) {
opal_pointer_array_set_item(orte_local_children, i, NULL);
OBJ_RELEASE(pdata);
break;
}
}
orte_dfs.get_file_map(proc, send_fms, pdata);
}
break;

Просмотреть файл

@ -83,7 +83,6 @@ static void read_cbfunc(long status, uint8_t *buffer, void *cbdata)
{
int *check = (int*)cbdata;
opal_output(0, "GOT READ STATUS %d", (int)status);
if (status < 0) {
read_active = false;
active = false;
@ -158,7 +157,6 @@ int main(int argc, char* argv[])
numread = 0;
while (read_active) {
i = READ_SIZE;
opal_output(0, "%s reading next %d bytes\n", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), i);
active = true;
orte_dfs.read(fd, buffer, READ_SIZE, read_cbfunc, &i);
ORTE_WAIT_FOR_COMPLETION(active);