/* -*- C -*- * * $HEADER$ * */ #include #include #include #include "opal/util/output.h" #include "opal/util/uri.h" #include "opal/mca/event/event.h" #include "orte/util/proc_info.h" #include "orte/util/name_fns.h" #include "orte/mca/errmgr/errmgr.h" #include "orte/runtime/orte_globals.h" #include "orte/runtime/runtime.h" #include "orte/runtime/orte_wait.h" #include "orte/mca/dfs/dfs.h" static bool active; static bool read_active; static int numread = 0; #define READ_SIZE 500 #define OFFSET_VALUE 313 static void dfs_open_cbfunc(int fd, void *cbdata) { int *remote_fd = (int*)cbdata; opal_output(0, "GOT FD %d", fd); *remote_fd = fd; active = false; } static void dfs_close_cbfunc(int fd, void *cbdata) { opal_output(0, "CLOSE CONFIRMED"); active = false; } static void dfs_size_cbfunc(long size, void *cbdata) { opal_output(0, "GOT FILE SIZE %ld", size); active = false; } static void dfs_seek_cbfunc(long offset, void *cbdata) { int *check = (int*)cbdata; opal_output(0, "GOT FILE OFFSET %ld", offset); active = false; if (NULL != cbdata && offset != *check) { exit(1); } } static void dfs_post_cbfunc(void *cbdata) { opal_buffer_t *bo = (opal_buffer_t*)cbdata; opal_output(0, "GOT POST CALLBACK"); active = false; OBJ_RELEASE(bo); } static void dfs_getfm_cbfunc(opal_buffer_t *bo, void *cbdata) { opal_buffer_t *bptr = (opal_buffer_t*)cbdata; opal_output(0, "GOT GETFM CALLBACK"); active = false; opal_dss.copy_payload(bptr, bo); } static void read_cbfunc(long status, uint8_t *buffer, void *cbdata) { int *check = (int*)cbdata; if (status < 0) { read_active = false; active = false; return; } numread += status; if (NULL != cbdata && status < *check) { read_active = false; opal_output(0, "EOF RECEIVED: read total of %d bytes", numread); active = false; return; } active = false; } int main(int argc, char* argv[]) { int rc; int fd; char *uri, *host, *path; uint8_t buffer[READ_SIZE]; opal_buffer_t *buf, *xfer; int i, k, cnt; int64_t i64, length, offset, partition; int32_t n, nvpids, nentries; orte_vpid_t vpid; if (0 != (rc = orte_init(&argc, &argv, ORTE_PROC_NON_MPI))) { fprintf(stderr, "orte_dfs: couldn't init orte - error code %d\n", rc); return rc; } /* if I am part of an initial job, then test my basic * API operations */ if (1 == ORTE_LOCAL_JOBID(ORTE_PROC_MY_NAME->jobid)) { /* user must provide a file to be read - the contents * of the file will be output to stdout */ if (1 == argc) { fprintf(stderr, "Usage: orte_dfs bytes_used); /* retrieve the number of vpids in the map */ cnt = 1; if (OPAL_SUCCESS != (rc = opal_dss.unpack(buf, &nvpids, &cnt, OPAL_INT32))) { ORTE_ERROR_LOG(rc); return 1; } opal_output(0, "%s RECVD DATA FROM %d VPIDS", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), nvpids); /* find a partition for us */ for (k=0; k < nvpids; k++) { cnt = 1; if (OPAL_SUCCESS != (rc = opal_dss.unpack(buf, &vpid, &cnt, ORTE_VPID))) { ORTE_ERROR_LOG(rc); break; } opal_output(0, "CHECKING VPID %s", ORTE_VPID_PRINT(vpid)); cnt = 1; if (OPAL_SUCCESS != (rc = opal_dss.unpack(buf, &nentries, &cnt, OPAL_INT32))) { ORTE_ERROR_LOG(rc); break; } opal_output(0, "%s RECVD %d ENTRIES IN THIS MAP", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), nentries); cnt = 1; for (i=0; i < nentries; i++) { cnt = 1; if (OPAL_SUCCESS != (rc = opal_dss.unpack(buf, &xfer, &cnt, OPAL_BUFFER))) { ORTE_ERROR_LOG(rc); break; } cnt = 1; if (OPAL_SUCCESS != (rc = opal_dss.unpack(xfer, &host, &cnt, OPAL_STRING))) { ORTE_ERROR_LOG(rc); break; } cnt = 1; if (OPAL_SUCCESS != (rc = opal_dss.unpack(xfer, &path, &cnt, OPAL_STRING))) { ORTE_ERROR_LOG(rc); break; } cnt = 1; if (OPAL_SUCCESS != (rc = opal_dss.unpack(xfer, &length, &cnt, OPAL_INT64))) { ORTE_ERROR_LOG(rc); break; } cnt = 1; if (OPAL_SUCCESS != (rc = opal_dss.unpack(xfer, &offset, &cnt, OPAL_INT64))) { ORTE_ERROR_LOG(rc); break; } cnt = 1; if (OPAL_SUCCESS != (rc = opal_dss.unpack(xfer, &partition, &cnt, OPAL_INT64))) { ORTE_ERROR_LOG(rc); break; } OBJ_RELEASE(xfer); opal_output(0, "CHECKING PARTITION %d\n\thost %s\n\tpath %s\n\tlength: %d offset: %d", (int)partition, (NULL == host) ? "NULL" : host, path, (int)length, (int)offset); continue; /* if this is my partition, use the file data */ if (partition == (int64_t)ORTE_PROC_MY_NAME->vpid) { /* open the file */ if (NULL == (uri = opal_filename_to_uri(path, host))) { return 1; } active = true; orte_dfs.open(uri, dfs_open_cbfunc, &fd); ORTE_WAIT_FOR_COMPLETION(active); if (fd < 0) { /* hit an error */ return 1; } /* position it */ active = true; orte_dfs.seek(fd, offset, SEEK_SET, dfs_seek_cbfunc, NULL); ORTE_WAIT_FOR_COMPLETION(active); /* read it */ active = true; numread = 0; orte_dfs.read(fd, buffer, length, read_cbfunc, NULL); ORTE_WAIT_FOR_COMPLETION(active); opal_output(0, "%s successfully read %d bytes", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), numread); active= true; orte_dfs.close(fd, dfs_close_cbfunc, NULL); ORTE_WAIT_FOR_COMPLETION(active); goto complete; } } } } complete: orte_finalize(); return 0; }