1
1

Add an API to get a remote file's size. Separate dfs cmds from returned data messages so daemons don't get confused.

This commit was SVN r27487.
Этот коммит содержится в:
Ralph Castain 2012-10-25 22:23:08 +00:00
родитель 51a3ec2d7b
Коммит df642f1508
7 изменённых файлов: 659 добавлений и 54 удалений

Просмотреть файл

@ -20,6 +20,7 @@
#ifdef HAVE_FCNTL_H
#include <fcntl.h>
#endif
#include <sys/stat.h>
#include "opal/util/if.h"
#include "opal/util/output.h"
@ -47,7 +48,10 @@ static void dfs_open(char *uri,
orte_dfs_open_callback_fn_t cbfunc,
void *cbdata);
static void dfs_close(int fd);
static void dfs_seek(int fd, size_t offset);
static void dfs_get_file_size(int fd,
orte_dfs_size_callback_fn_t cbfunc,
void *cbdata);
static void dfs_seek(int fd, size_t offset, int whence);
static void dfs_read(int fd, uint8_t *buffer,
long length,
orte_dfs_read_callback_fn_t cbfunc,
@ -61,6 +65,7 @@ orte_dfs_base_module_t orte_dfs_app_module = {
finalize,
dfs_open,
dfs_close,
dfs_get_file_size,
dfs_seek,
dfs_read
};
@ -79,7 +84,7 @@ static int init(void)
OBJ_CONSTRUCT(&requests, opal_list_t);
OBJ_CONSTRUCT(&active_files, opal_list_t);
if (ORTE_SUCCESS != (rc = orte_rml.recv_buffer_nb(ORTE_NAME_WILDCARD,
ORTE_RML_TAG_DFS,
ORTE_RML_TAG_DFS_DATA,
ORTE_RML_PERSISTENT,
recv_dfs,
NULL))) {
@ -92,7 +97,7 @@ static int finalize(void)
{
opal_list_item_t *item;
orte_rml.recv_cancel(ORTE_NAME_WILDCARD, ORTE_RML_TAG_DFS);
orte_rml.recv_cancel(ORTE_NAME_WILDCARD, ORTE_RML_TAG_DFS_DATA);
while (NULL != (item = opal_list_remove_first(&requests))) {
OBJ_RELEASE(item);
}
@ -209,6 +214,48 @@ static void recv_dfs(int status, orte_process_name_t* sender,
OBJ_RELEASE(dfs);
break;
case ORTE_DFS_SIZE_CMD:
/* unpack the request id for this request */
cnt = 1;
if (OPAL_SUCCESS != (rc = opal_dss.unpack(buffer, &rid, &cnt, OPAL_UINT64))) {
ORTE_ERROR_LOG(rc);
return;
}
/* search our list of requests to find the matching one */
dfs = NULL;
for (item = opal_list_get_first(&requests);
item != opal_list_get_end(&requests);
item = opal_list_get_next(item)) {
dptr = (orte_dfs_request_t*)item;
if (dptr->id == rid) {
/* request was fulfilled, so remove it */
opal_list_remove_item(&requests, item);
dfs = dptr;
break;
}
}
if (NULL == dfs) {
opal_output_verbose(1, orte_dfs_base.output,
"%s recvd size - no corresponding request found for local fd %d",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), local_fd);
ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND);
return;
}
/* get the size */
cnt = 1;
if (OPAL_SUCCESS != (rc = opal_dss.unpack(buffer, &i64, &cnt, OPAL_INT64))) {
ORTE_ERROR_LOG(rc);
OBJ_RELEASE(dfs);
return;
}
/* pass them back to the original caller */
if (NULL != dfs->size_cbfunc) {
dfs->size_cbfunc(i64, dfs->cbdata);
}
/* release the request */
OBJ_RELEASE(dfs);
break;
case ORTE_DFS_READ_CMD:
/* unpack the request id for this read */
cnt = 1;
@ -296,7 +343,7 @@ static void open_local_file(orte_dfs_request_t *dfs)
trk = OBJ_NEW(orte_dfs_tracker_t);
trk->requestor.jobid = ORTE_PROC_MY_NAME->jobid;
trk->requestor.vpid = ORTE_PROC_MY_NAME->vpid;
trk->filename = strdup(dfs->uri);
trk->filename = strdup(filename);
/* define the local fd */
trk->local_fd = local_fd++;
/* record the remote file descriptor */
@ -433,7 +480,7 @@ static void process_opens(int fd, short args, void *cbdata)
filename);
/* send it */
if (0 > (rc = orte_rml.send_buffer_nb(&daemon, buffer,
ORTE_RML_TAG_DFS, 0,
ORTE_RML_TAG_DFS_CMD, 0,
orte_rml_send_callback, NULL))) {
ORTE_ERROR_LOG(rc);
OBJ_RELEASE(buffer);
@ -530,7 +577,7 @@ static void process_close(int fd, short args, void *cbdata)
trk->local_fd);
/* send it */
if (0 > (rc = orte_rml.send_buffer_nb(&trk->host_daemon, buffer,
ORTE_RML_TAG_DFS, 0,
ORTE_RML_TAG_DFS_CMD, 0,
orte_rml_send_callback, NULL))) {
ORTE_ERROR_LOG(rc);
OBJ_RELEASE(buffer);
@ -559,6 +606,130 @@ static void dfs_close(int fd)
ORTE_DFS_POST_REQUEST(dfs, process_close);
}
static void process_sizes(int fd, short args, void *cbdata)
{
orte_dfs_request_t *size_dfs = (orte_dfs_request_t*)cbdata;
orte_dfs_tracker_t *tptr, *trk;
opal_list_item_t *item;
opal_buffer_t *buffer;
int rc;
struct stat buf;
opal_output_verbose(1, orte_dfs_base.output,
"%s processing get_size on fd %d",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
size_dfs->local_fd);
/* look in our local records for this fd */
trk = NULL;
for (item = opal_list_get_first(&active_files);
item != opal_list_get_end(&active_files);
item = opal_list_get_next(item)) {
tptr = (orte_dfs_tracker_t*)item;
if (tptr->local_fd == size_dfs->local_fd) {
trk = tptr;
break;
}
}
if (NULL == trk) {
ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND);
OBJ_RELEASE(size_dfs);
return;
}
/* if the file is local, execute the seek on it - we
* stuck the "whence" value in the remote_fd
*/
if (trk->host_daemon.vpid == ORTE_PROC_MY_DAEMON->vpid) {
/* stat the file and get its size */
if (0 > stat(trk->filename, &buf)) {
/* cannot stat file */
opal_output_verbose(1, orte_dfs_base.output,
"%s could not stat %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
trk->filename);
if (NULL != size_dfs->size_cbfunc) {
size_dfs->size_cbfunc(-1, size_dfs->cbdata);
}
} else {
if (NULL != size_dfs->size_cbfunc) {
size_dfs->size_cbfunc(buf.st_size, size_dfs->cbdata);
}
}
goto complete;
}
/* add this request to our local list so we can
* match it with the returned response when it comes
*/
size_dfs->id = req_id++;
opal_list_append(&requests, &size_dfs->super);
/* setup a message for the daemon telling
* them what file we want to access
*/
buffer = OBJ_NEW(opal_buffer_t);
if (OPAL_SUCCESS != (rc = opal_dss.pack(buffer, &size_dfs->cmd, 1, ORTE_DFS_CMD_T))) {
ORTE_ERROR_LOG(rc);
opal_list_remove_item(&requests, &size_dfs->super);
goto complete;
}
/* pass the request id */
if (OPAL_SUCCESS != (rc = opal_dss.pack(buffer, &size_dfs->id, 1, OPAL_UINT64))) {
ORTE_ERROR_LOG(rc);
opal_list_remove_item(&requests, &size_dfs->super);
goto complete;
}
if (OPAL_SUCCESS != (rc = opal_dss.pack(buffer, &trk->remote_fd, 1, OPAL_INT))) {
ORTE_ERROR_LOG(rc);
opal_list_remove_item(&requests, &size_dfs->super);
goto complete;
}
opal_output_verbose(1, orte_dfs_base.output,
"%s sending get_size request to %s for fd %d",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(&trk->host_daemon),
trk->local_fd);
/* send it */
if (0 > (rc = orte_rml.send_buffer_nb(&trk->host_daemon, buffer,
ORTE_RML_TAG_DFS_CMD, 0,
orte_rml_send_callback, NULL))) {
ORTE_ERROR_LOG(rc);
OBJ_RELEASE(buffer);
opal_list_remove_item(&requests, &size_dfs->super);
if (NULL != size_dfs->size_cbfunc) {
size_dfs->size_cbfunc(-1, size_dfs->cbdata);
}
goto complete;
}
/* leave the request there */
return;
complete:
OBJ_RELEASE(size_dfs);
}
static void dfs_get_file_size(int fd,
orte_dfs_size_callback_fn_t cbfunc,
void *cbdata)
{
orte_dfs_request_t *dfs;
opal_output_verbose(1, orte_dfs_base.output,
"%s get_size called on fd %d",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), fd);
dfs = OBJ_NEW(orte_dfs_request_t);
dfs->cmd = ORTE_DFS_SIZE_CMD;
dfs->local_fd = fd;
dfs->size_cbfunc = cbfunc;
dfs->cbdata = cbdata;
/* post it for processing */
ORTE_DFS_POST_REQUEST(dfs, process_sizes);
}
static void process_seeks(int fd, short args, void *cbdata)
{
orte_dfs_request_t *seek_dfs = (orte_dfs_request_t*)cbdata;
@ -590,9 +761,15 @@ static void process_seeks(int fd, short args, void *cbdata)
return;
}
/* if the file is local, execute the seek on it */
/* if the file is local, execute the seek on it - we
* stuck the "whence" value in the remote_fd
*/
if (trk->host_daemon.vpid == ORTE_PROC_MY_DAEMON->vpid) {
lseek(trk->remote_fd, seek_dfs->read_length, SEEK_SET);
opal_output_verbose(1, orte_dfs_base.output,
"%s local seek on fd %d",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
seek_dfs->local_fd);
lseek(trk->remote_fd, seek_dfs->read_length, seek_dfs->remote_fd);
goto complete;
}
@ -613,7 +790,11 @@ static void process_seeks(int fd, short args, void *cbdata)
ORTE_ERROR_LOG(rc);
goto complete;
}
if (OPAL_SUCCESS != (rc = opal_dss.pack(buffer, &seek_dfs->remote_fd, 1, OPAL_INT))) {
ORTE_ERROR_LOG(rc);
goto complete;
}
opal_output_verbose(1, orte_dfs_base.output,
"%s sending seek file request to %s for fd %d",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
@ -621,7 +802,7 @@ static void process_seeks(int fd, short args, void *cbdata)
trk->local_fd);
/* send it */
if (0 > (rc = orte_rml.send_buffer_nb(&trk->host_daemon, buffer,
ORTE_RML_TAG_DFS, 0,
ORTE_RML_TAG_DFS_CMD, 0,
orte_rml_send_callback, NULL))) {
ORTE_ERROR_LOG(rc);
OBJ_RELEASE(buffer);
@ -633,7 +814,7 @@ static void process_seeks(int fd, short args, void *cbdata)
}
static void dfs_seek(int fd, size_t offset)
static void dfs_seek(int fd, size_t offset, int whence)
{
orte_dfs_request_t *dfs;
@ -645,6 +826,7 @@ static void dfs_seek(int fd, size_t offset)
dfs->cmd = ORTE_DFS_SEEK_CMD;
dfs->local_fd = fd;
dfs->read_length = offset;
dfs->remote_fd = whence;
/* post it for processing */
ORTE_DFS_POST_REQUEST(dfs, process_seeks);
@ -722,7 +904,7 @@ static void process_reads(int fd, short args, void *cbdata)
trk->local_fd);
/* send it */
if (0 > (rc = orte_rml.send_buffer_nb(&trk->host_daemon, buffer,
ORTE_RML_TAG_DFS, 0,
ORTE_RML_TAG_DFS_CMD, 0,
orte_rml_send_callback, NULL))) {
ORTE_ERROR_LOG(rc);
OBJ_RELEASE(buffer);

Просмотреть файл

@ -67,6 +67,7 @@ typedef struct {
uint8_t *read_buffer;
long read_length;
orte_dfs_open_callback_fn_t open_cbfunc;
orte_dfs_size_callback_fn_t size_cbfunc;
orte_dfs_read_callback_fn_t read_cbfunc;
void *cbdata;
} orte_dfs_request_t;

Просмотреть файл

@ -13,6 +13,10 @@
#include "orte_config.h"
#ifdef HAVE_FCNTL_H
#include <fcntl.h>
#endif
#include "opal/mca/mca.h"
#include "opal/mca/base/base.h"
@ -62,11 +66,21 @@ typedef void (*orte_dfs_base_module_open_fn_t)(char *uri,
*/
typedef void (*orte_dfs_base_module_close_fn_t)(int fd);
/* Get the size of a file
*
*/
typedef void (*orte_dfs_base_module_get_file_size_fn_t)(int fd,
orte_dfs_size_callback_fn_t cbfunc,
void *cbdata);
/* Position a file
*
* Move the read position in the file to the specified byte number
* relative to the location specified by whence:
* SEEK_SET => from beginning of file
* SEEK_CUR => from current location
*/
typedef void (*orte_dfs_base_module_seek_fn_t)(int fd, size_t offset);
typedef void (*orte_dfs_base_module_seek_fn_t)(int fd, size_t offset, int whence);
/* Read bytes from a possibly remote file
*
@ -94,10 +108,11 @@ struct orte_dfs_base_module_1_0_0_t {
/** Finalization Function */
orte_dfs_base_module_finalize_fn_t finalize;
orte_dfs_base_module_open_fn_t open;
orte_dfs_base_module_close_fn_t close;
orte_dfs_base_module_seek_fn_t seek;
orte_dfs_base_module_read_fn_t read;
orte_dfs_base_module_open_fn_t open;
orte_dfs_base_module_close_fn_t close;
orte_dfs_base_module_get_file_size_fn_t get_file_size;
orte_dfs_base_module_seek_fn_t seek;
orte_dfs_base_module_read_fn_t read;
};
typedef struct orte_dfs_base_module_1_0_0_t orte_dfs_base_module_1_0_0_t;
typedef orte_dfs_base_module_1_0_0_t orte_dfs_base_module_t;

Просмотреть файл

@ -20,11 +20,14 @@ typedef uint8_t orte_dfs_cmd_t;
#define ORTE_DFS_OPEN_CMD 1
#define ORTE_DFS_CLOSE_CMD 2
#define ORTE_DFS_SEEK_CMD 3
#define ORTE_DFS_READ_CMD 4
#define ORTE_DFS_SIZE_CMD 3
#define ORTE_DFS_SEEK_CMD 4
#define ORTE_DFS_READ_CMD 5
typedef void (*orte_dfs_open_callback_fn_t)(int fd, void *cbdata);
typedef void (*orte_dfs_size_callback_fn_t)(long size, void *cbdata);
typedef void (*orte_dfs_read_callback_fn_t)(long status,
uint8_t *buffer,
void *cbdata);

Просмотреть файл

@ -20,6 +20,7 @@
#ifdef HAVE_FCNTL_H
#include <fcntl.h>
#endif
#include <sys/stat.h>
#include "opal/util/if.h"
#include "opal/util/output.h"
@ -54,7 +55,10 @@ static void dfs_open(char *uri,
orte_dfs_open_callback_fn_t cbfunc,
void *cbdata);
static void dfs_close(int fd);
static void dfs_seek(int fd, size_t offset);
static void dfs_get_file_size(int fd,
orte_dfs_size_callback_fn_t cbfunc,
void *cbdata);
static void dfs_seek(int fd, size_t offset, int whence);
static void dfs_read(int fd, uint8_t *buffer,
long length,
orte_dfs_read_callback_fn_t cbfunc,
@ -68,6 +72,7 @@ orte_dfs_base_module_t orte_dfs_orted_module = {
finalize,
dfs_open,
dfs_close,
dfs_get_file_size,
dfs_seek,
dfs_read
};
@ -75,9 +80,12 @@ orte_dfs_base_module_t orte_dfs_orted_module = {
static opal_list_t requests, active_files;
static int local_fd = 0;
static uint64_t req_id = 0;
static void recv_dfs(int status, orte_process_name_t* sender,
opal_buffer_t* buffer, orte_rml_tag_t tag,
void* cbdata);
static void recv_dfs_cmd(int status, orte_process_name_t* sender,
opal_buffer_t* buffer, orte_rml_tag_t tag,
void* cbdata);
static void recv_dfs_data(int status, orte_process_name_t* sender,
opal_buffer_t* buffer, orte_rml_tag_t tag,
void* cbdata);
static int init(void)
{
@ -86,9 +94,16 @@ static int init(void)
OBJ_CONSTRUCT(&requests, opal_list_t);
OBJ_CONSTRUCT(&active_files, opal_list_t);
if (ORTE_SUCCESS != (rc = orte_rml.recv_buffer_nb(ORTE_NAME_WILDCARD,
ORTE_RML_TAG_DFS,
ORTE_RML_TAG_DFS_CMD,
ORTE_RML_PERSISTENT,
recv_dfs,
recv_dfs_cmd,
NULL))) {
ORTE_ERROR_LOG(rc);
}
if (ORTE_SUCCESS != (rc = orte_rml.recv_buffer_nb(ORTE_NAME_WILDCARD,
ORTE_RML_TAG_DFS_DATA,
ORTE_RML_PERSISTENT,
recv_dfs_data,
NULL))) {
ORTE_ERROR_LOG(rc);
}
@ -99,7 +114,8 @@ static int finalize(void)
{
opal_list_item_t *item;
orte_rml.recv_cancel(ORTE_NAME_WILDCARD, ORTE_RML_TAG_DFS);
orte_rml.recv_cancel(ORTE_NAME_WILDCARD, ORTE_RML_TAG_DFS_CMD);
orte_rml.recv_cancel(ORTE_NAME_WILDCARD, ORTE_RML_TAG_DFS_DATA);
while (NULL != (item = opal_list_remove_first(&requests))) {
OBJ_RELEASE(item);
}
@ -278,7 +294,7 @@ static void process_opens(int fd, short args, void *cbdata)
filename);
/* send it */
if (0 > (rc = orte_rml.send_buffer_nb(&node->daemon->name, buffer,
ORTE_RML_TAG_DFS, 0,
ORTE_RML_TAG_DFS_CMD, 0,
orte_rml_send_callback, NULL))) {
ORTE_ERROR_LOG(rc);
OBJ_RELEASE(buffer);
@ -375,7 +391,7 @@ static void process_close(int fd, short args, void *cbdata)
trk->local_fd);
/* send it */
if (0 > (rc = orte_rml.send_buffer_nb(&trk->host_daemon, buffer,
ORTE_RML_TAG_DFS, 0,
ORTE_RML_TAG_DFS_CMD, 0,
orte_rml_send_callback, NULL))) {
ORTE_ERROR_LOG(rc);
OBJ_RELEASE(buffer);
@ -404,6 +420,110 @@ static void dfs_close(int fd)
ORTE_DFS_POST_REQUEST(dfs, process_close);
}
static void process_sizes(int fd, short args, void *cbdata)
{
orte_dfs_request_t *size_dfs = (orte_dfs_request_t*)cbdata;
orte_dfs_tracker_t *tptr, *trk;
opal_list_item_t *item;
opal_buffer_t *buffer;
int rc;
struct stat buf;
opal_output_verbose(1, orte_dfs_base.output,
"%s processing get_size on fd %d",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
size_dfs->local_fd);
/* look in our local records for this fd */
trk = NULL;
for (item = opal_list_get_first(&active_files);
item != opal_list_get_end(&active_files);
item = opal_list_get_next(item)) {
tptr = (orte_dfs_tracker_t*)item;
if (tptr->local_fd == size_dfs->local_fd) {
trk = tptr;
break;
}
}
if (NULL == trk) {
ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND);
OBJ_RELEASE(size_dfs);
return;
}
/* if the file is local, execute the seek on it - we
* stuck the "whence" value in the remote_fd
*/
if (trk->host_daemon.vpid == ORTE_PROC_MY_DAEMON->vpid) {
/* stat the file and get its size */
if (0 > stat(trk->filename, &buf)) {
/* cannot stat file */
opal_output_verbose(1, orte_dfs_base.output,
"%s could not stat %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
trk->filename);
if (NULL != size_dfs->size_cbfunc) {
size_dfs->size_cbfunc(-1, size_dfs->cbdata);
}
}
goto complete;
}
/* setup a message for the daemon telling
* them what file to get the size of
*/
buffer = OBJ_NEW(opal_buffer_t);
if (OPAL_SUCCESS != (rc = opal_dss.pack(buffer, &size_dfs->cmd, 1, ORTE_DFS_CMD_T))) {
ORTE_ERROR_LOG(rc);
goto complete;
}
if (OPAL_SUCCESS != (rc = opal_dss.pack(buffer, &trk->remote_fd, 1, OPAL_INT))) {
ORTE_ERROR_LOG(rc);
goto complete;
}
opal_output_verbose(1, orte_dfs_base.output,
"%s sending get_size request to %s for fd %d",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(&trk->host_daemon),
trk->local_fd);
/* send it */
if (0 > (rc = orte_rml.send_buffer_nb(&trk->host_daemon, buffer,
ORTE_RML_TAG_DFS_CMD, 0,
orte_rml_send_callback, NULL))) {
ORTE_ERROR_LOG(rc);
OBJ_RELEASE(buffer);
if (NULL != size_dfs->size_cbfunc) {
size_dfs->size_cbfunc(-1, size_dfs->cbdata);
}
goto complete;
}
complete:
OBJ_RELEASE(size_dfs);
}
static void dfs_get_file_size(int fd,
orte_dfs_size_callback_fn_t cbfunc,
void *cbdata)
{
orte_dfs_request_t *dfs;
opal_output_verbose(1, orte_dfs_base.output,
"%s get_size called on fd %d",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), fd);
dfs = OBJ_NEW(orte_dfs_request_t);
dfs->cmd = ORTE_DFS_SIZE_CMD;
dfs->local_fd = fd;
dfs->size_cbfunc = cbfunc;
dfs->cbdata = cbdata;
/* post it for processing */
ORTE_DFS_POST_REQUEST(dfs, process_sizes);
}
static void process_seeks(int fd, short args, void *cbdata)
{
orte_dfs_request_t *seek_dfs = (orte_dfs_request_t*)cbdata;
@ -435,9 +555,15 @@ static void process_seeks(int fd, short args, void *cbdata)
return;
}
/* if the file is local, execute the seek on it */
/* if the file is local, execute the seek on it - we
* stuck the "whence" value in the remote_fd
*/
if (trk->host_daemon.vpid == ORTE_PROC_MY_DAEMON->vpid) {
lseek(trk->remote_fd, seek_dfs->read_length, SEEK_SET);
opal_output_verbose(1, orte_dfs_base.output,
"%s local seek on fd %d",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
seek_dfs->local_fd);
lseek(trk->remote_fd, seek_dfs->read_length, seek_dfs->remote_fd);
goto complete;
}
@ -458,7 +584,11 @@ static void process_seeks(int fd, short args, void *cbdata)
ORTE_ERROR_LOG(rc);
goto complete;
}
if (OPAL_SUCCESS != (rc = opal_dss.pack(buffer, &seek_dfs->remote_fd, 1, OPAL_INT))) {
ORTE_ERROR_LOG(rc);
goto complete;
}
opal_output_verbose(1, orte_dfs_base.output,
"%s sending seek file request to %s for fd %d",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
@ -466,7 +596,7 @@ static void process_seeks(int fd, short args, void *cbdata)
trk->local_fd);
/* send it */
if (0 > (rc = orte_rml.send_buffer_nb(&trk->host_daemon, buffer,
ORTE_RML_TAG_DFS, 0,
ORTE_RML_TAG_DFS_CMD, 0,
orte_rml_send_callback, NULL))) {
ORTE_ERROR_LOG(rc);
OBJ_RELEASE(buffer);
@ -478,7 +608,7 @@ static void process_seeks(int fd, short args, void *cbdata)
}
static void dfs_seek(int fd, size_t offset)
static void dfs_seek(int fd, size_t offset, int whence)
{
orte_dfs_request_t *dfs;
@ -490,6 +620,7 @@ static void dfs_seek(int fd, size_t offset)
dfs->cmd = ORTE_DFS_SEEK_CMD;
dfs->local_fd = fd;
dfs->read_length = offset;
dfs->remote_fd = whence;
/* post it for processing */
ORTE_DFS_POST_REQUEST(dfs, process_seeks);
@ -567,7 +698,7 @@ static void process_reads(int fd, short args, void *cbdata)
trk->local_fd);
/* send it */
if (0 > (rc = orte_rml.send_buffer_nb(&trk->host_daemon, buffer,
ORTE_RML_TAG_DFS, 0,
ORTE_RML_TAG_DFS_CMD, 0,
orte_rml_send_callback, NULL))) {
ORTE_ERROR_LOG(rc);
OBJ_RELEASE(buffer);
@ -604,9 +735,9 @@ static void dfs_read(int fd, uint8_t *buffer,
/* receives take place in an event, so we are free to process
* the request list without fear of getting things out-of-order
*/
static void recv_dfs(int status, orte_process_name_t* sender,
opal_buffer_t* buffer, orte_rml_tag_t tag,
void* cbdata)
static void recv_dfs_cmd(int status, orte_process_name_t* sender,
opal_buffer_t* buffer, orte_rml_tag_t tag,
void* cbdata)
{
orte_dfs_cmd_t cmd;
int32_t cnt;
@ -617,6 +748,8 @@ static void recv_dfs(int status, orte_process_name_t* sender,
int64_t i64, bytes_read;
uint8_t *read_buf;
uint64_t rid;
int whence;
struct stat buf;
/* unpack the command */
cnt = 1;
@ -679,7 +812,7 @@ static void recv_dfs(int status, orte_process_name_t* sender,
}
/* send it */
if (0 > (rc = orte_rml.send_buffer_nb(sender, buffer,
ORTE_RML_TAG_DFS, 0,
ORTE_RML_TAG_DFS_DATA, 0,
orte_rml_send_callback, NULL))) {
ORTE_ERROR_LOG(rc);
OBJ_RELEASE(buffer);
@ -710,6 +843,63 @@ static void recv_dfs(int status, orte_process_name_t* sender,
}
break;
case ORTE_DFS_SIZE_CMD:
/* unpack their request id */
cnt = 1;
if (OPAL_SUCCESS != (rc = opal_dss.unpack(buffer, &rid, &cnt, OPAL_UINT64))) {
ORTE_ERROR_LOG(rc);
return;
}
/* unpack our fd */
cnt = 1;
if (OPAL_SUCCESS != (rc = opal_dss.unpack(buffer, &my_fd, &cnt, OPAL_INT))) {
ORTE_ERROR_LOG(rc);
return;
}
/* find the corresponding tracker */
i64 = -1;
for (item = opal_list_get_first(&requests);
item != opal_list_get_end(&requests);
item = opal_list_get_next(item)) {
trk = (orte_dfs_tracker_t*)item;
if (my_fd == trk->local_fd) {
/* stat the file and get its size */
if (0 > stat(trk->filename, &buf)) {
/* cannot stat file */
opal_output_verbose(1, orte_dfs_base.output,
"%s could not stat %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
trk->filename);
} else {
i64 = buf.st_size;
}
break;
}
}
/* construct the return message */
buffer = OBJ_NEW(opal_buffer_t);
if (OPAL_SUCCESS != (rc = opal_dss.pack(buffer, &cmd, 1, ORTE_DFS_CMD_T))) {
ORTE_ERROR_LOG(rc);
return;
}
if (OPAL_SUCCESS != (rc = opal_dss.pack(buffer, &rid, 1, OPAL_UINT64))) {
ORTE_ERROR_LOG(rc);
return;
}
if (OPAL_SUCCESS != (rc = opal_dss.pack(buffer, &i64, 1, OPAL_INT64))) {
ORTE_ERROR_LOG(rc);
return;
}
/* send it */
if (0 > (rc = orte_rml.send_buffer_nb(sender, buffer,
ORTE_RML_TAG_DFS_DATA, 0,
orte_rml_send_callback, NULL))) {
ORTE_ERROR_LOG(rc);
OBJ_RELEASE(buffer);
return;
}
break;
case ORTE_DFS_SEEK_CMD:
/* unpack our fd */
cnt = 1;
@ -723,6 +913,12 @@ static void recv_dfs(int status, orte_process_name_t* sender,
ORTE_ERROR_LOG(rc);
return;
}
/* unpack the whence */
cnt = 1;
if (OPAL_SUCCESS != (rc = opal_dss.unpack(buffer, &whence, &cnt, OPAL_INT))) {
ORTE_ERROR_LOG(rc);
return;
}
/* find the corresponding tracker - we do this to ensure
* that the local fd we were sent is actually open
*/
@ -732,7 +928,7 @@ static void recv_dfs(int status, orte_process_name_t* sender,
trk = (orte_dfs_tracker_t*)item;
if (my_fd == trk->local_fd) {
/* do the seek */
lseek(my_fd, (long)i64, SEEK_SET);
lseek(my_fd, (long)i64, whence);
break;
}
}
@ -813,7 +1009,7 @@ static void recv_dfs(int status, orte_process_name_t* sender,
(long)bytes_read,
ORTE_NAME_PRINT(sender));
if (0 > (rc = orte_rml.send_buffer_nb(sender, buffer,
ORTE_RML_TAG_DFS, 0,
ORTE_RML_TAG_DFS_DATA, 0,
orte_rml_send_callback, NULL))) {
ORTE_ERROR_LOG(rc);
OBJ_RELEASE(buffer);
@ -829,3 +1025,203 @@ static void recv_dfs(int status, orte_process_name_t* sender,
break;
}
}
static void recv_dfs_data(int status, orte_process_name_t* sender,
opal_buffer_t* buffer, orte_rml_tag_t tag,
void* cbdata)
{
orte_dfs_cmd_t cmd;
int32_t cnt;
orte_dfs_request_t *dfs, *dptr;
opal_list_item_t *item;
int remote_fd, rc;
int64_t i64;
uint64_t rid;
orte_dfs_tracker_t *trk;
/* unpack the command this message is responding to */
cnt = 1;
if (OPAL_SUCCESS != (rc = opal_dss.unpack(buffer, &cmd, &cnt, ORTE_DFS_CMD_T))) {
ORTE_ERROR_LOG(rc);
return;
}
opal_output_verbose(1, orte_dfs_base.output,
"%s recvd:data cmd %d from sender %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), (int)cmd,
ORTE_NAME_PRINT(sender));
switch (cmd) {
case ORTE_DFS_OPEN_CMD:
/* unpack the request id */
cnt = 1;
if (OPAL_SUCCESS != (rc = opal_dss.unpack(buffer, &rid, &cnt, OPAL_UINT64))) {
ORTE_ERROR_LOG(rc);
return;
}
/* unpack the remote fd */
cnt = 1;
if (OPAL_SUCCESS != (rc = opal_dss.unpack(buffer, &remote_fd, &cnt, OPAL_INT))) {
ORTE_ERROR_LOG(rc);
return;
}
/* search our list of requests to find the matching one */
dfs = NULL;
for (item = opal_list_get_first(&requests);
item != opal_list_get_end(&requests);
item = opal_list_get_next(item)) {
dptr = (orte_dfs_request_t*)item;
if (dptr->id == rid) {
/* as the request has been fulfilled, remove it */
opal_list_remove_item(&requests, item);
dfs = dptr;
break;
}
}
if (NULL == dfs) {
opal_output_verbose(1, orte_dfs_base.output,
"%s recvd:data open file - no corresponding request found for local fd %d",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), local_fd);
ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND);
return;
}
/* if the remote_fd < 0, then we had an error, so return
* the error value to the caller
*/
if (remote_fd < 0) {
opal_output_verbose(1, orte_dfs_base.output,
"%s recvd:data open file response error file %s [error: %d]",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
dfs->uri, remote_fd);
if (NULL != dfs->open_cbfunc) {
dfs->open_cbfunc(remote_fd, dfs->cbdata);
}
/* release the request */
OBJ_RELEASE(dfs);
return;
}
/* otherwise, create a tracker for this file */
trk = OBJ_NEW(orte_dfs_tracker_t);
trk->requestor.jobid = ORTE_PROC_MY_NAME->jobid;
trk->requestor.vpid = ORTE_PROC_MY_NAME->vpid;
trk->host_daemon.jobid = sender->jobid;
trk->host_daemon.vpid = sender->vpid;
trk->filename = strdup(dfs->uri);
/* define the local fd */
trk->local_fd = local_fd++;
/* record the remote file descriptor */
trk->remote_fd = remote_fd;
/* add it to our list of active files */
opal_list_append(&active_files, &trk->super);
/* return the local_fd to the caller for
* subsequent operations
*/
opal_output_verbose(1, orte_dfs_base.output,
"%s recvd:data open file completed for file %s [local fd: %d remote fd: %d]",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
dfs->uri, trk->local_fd, remote_fd);
if (NULL != dfs->open_cbfunc) {
dfs->open_cbfunc(trk->local_fd, dfs->cbdata);
}
/* release the request */
OBJ_RELEASE(dfs);
break;
case ORTE_DFS_SIZE_CMD:
/* unpack the request id for this request */
cnt = 1;
if (OPAL_SUCCESS != (rc = opal_dss.unpack(buffer, &rid, &cnt, OPAL_UINT64))) {
ORTE_ERROR_LOG(rc);
return;
}
/* search our list of requests to find the matching one */
dfs = NULL;
for (item = opal_list_get_first(&requests);
item != opal_list_get_end(&requests);
item = opal_list_get_next(item)) {
dptr = (orte_dfs_request_t*)item;
if (dptr->id == rid) {
/* request was fulfilled, so remove it */
opal_list_remove_item(&requests, item);
dfs = dptr;
break;
}
}
if (NULL == dfs) {
opal_output_verbose(1, orte_dfs_base.output,
"%s recvd:data size - no corresponding request found for local fd %d",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), local_fd);
ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND);
return;
}
/* get the size */
cnt = 1;
if (OPAL_SUCCESS != (rc = opal_dss.unpack(buffer, &i64, &cnt, OPAL_INT64))) {
ORTE_ERROR_LOG(rc);
OBJ_RELEASE(dfs);
return;
}
/* pass them back to the original caller */
if (NULL != dfs->read_cbfunc) {
dfs->size_cbfunc(i64, dfs->cbdata);
}
/* release the request */
OBJ_RELEASE(dfs);
break;
case ORTE_DFS_READ_CMD:
/* unpack the request id for this read */
cnt = 1;
if (OPAL_SUCCESS != (rc = opal_dss.unpack(buffer, &rid, &cnt, OPAL_UINT64))) {
ORTE_ERROR_LOG(rc);
return;
}
/* search our list of requests to find the matching one */
dfs = NULL;
for (item = opal_list_get_first(&requests);
item != opal_list_get_end(&requests);
item = opal_list_get_next(item)) {
dptr = (orte_dfs_request_t*)item;
if (dptr->id == rid) {
/* request was fulfilled, so remove it */
opal_list_remove_item(&requests, item);
dfs = dptr;
break;
}
}
if (NULL == dfs) {
opal_output_verbose(1, orte_dfs_base.output,
"%s recvd:data read - no corresponding request found for local fd %d",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), local_fd);
ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND);
return;
}
/* get the bytes read */
cnt = 1;
if (OPAL_SUCCESS != (rc = opal_dss.unpack(buffer, &i64, &cnt, OPAL_INT64))) {
ORTE_ERROR_LOG(rc);
OBJ_RELEASE(dfs);
return;
}
if (0 < i64) {
cnt = i64;
if (OPAL_SUCCESS != (rc = opal_dss.unpack(buffer, dfs->read_buffer, &cnt, OPAL_UINT8))) {
ORTE_ERROR_LOG(rc);
OBJ_RELEASE(dfs);
return;
}
}
/* pass them back to the original caller */
if (NULL != dfs->read_cbfunc) {
dfs->read_cbfunc(i64, dfs->read_buffer, dfs->cbdata);
}
/* release the request */
OBJ_RELEASE(dfs);
break;
default:
opal_output(0, "ORTED:DFS:RECV:DATA WTF");
break;
}
}

Просмотреть файл

@ -131,7 +131,8 @@ BEGIN_C_DECLS
#define ORTE_RML_TAG_FAILURE_NOTICE 44
/* distributed file system */
#define ORTE_RML_TAG_DFS 45
#define ORTE_RML_TAG_DFS_CMD 45
#define ORTE_RML_TAG_DFS_DATA 46
#define ORTE_RML_TAG_MAX 100

Просмотреть файл

@ -5,6 +5,7 @@
*/
#include <stdio.h>
#include <unistd.h>
#include <fcntl.h>
#include "opal/util/output.h"
#include "opal/util/uri.h"
@ -22,6 +23,8 @@ static bool active;
static bool read_active;
static int numread = 0;
#define READ_SIZE 1000
static void dfs_open_cbfunc(int fd, void *cbdata)
{
int *remote_fd = (int*)cbdata;
@ -32,6 +35,13 @@ static void dfs_open_cbfunc(int fd, void *cbdata)
}
static void dfs_size_cbfunc(long size, void *cbdata)
{
opal_output(0, "GOT FILE SIZE %ld", size);
active = false;
}
static void read_cbfunc(long status, uint8_t *buffer, void *cbdata)
{
opal_output(0, "GOT READ STATUS %d", (int)status);
@ -42,7 +52,7 @@ static void read_cbfunc(long status, uint8_t *buffer, void *cbdata)
}
numread += status;
if (status < 100) {
if (status < READ_SIZE) {
read_active = false;
opal_output(0, "EOF RECEIVED: read total of %d bytes", numread);
active = false;
@ -56,8 +66,7 @@ int main(int argc, char* argv[])
int rc;
int fd;
char *uri, *host;
char *testname, *testhost;
uint8_t buffer[1000];
uint8_t buffer[READ_SIZE];
/* user must provide a file to be read - the contents
* of the file will be output to stdout
@ -81,17 +90,15 @@ int main(int argc, char* argv[])
if (NULL == (uri = opal_filename_to_uri(argv[1], host))) {
return 1;
}
fprintf(stderr, "Got uri %s\n", uri);
if (NULL == (testname = opal_filename_from_uri(uri, &testhost))) {
fprintf(stderr, "Error: failed to get filename from uri %s\n", uri);
return 1;
}
fprintf(stderr, "Got file %s host %s from uri %s\n",
testname, (NULL == testhost) ? "NULL" : testhost, uri);
active = true;
orte_dfs.open(uri, dfs_open_cbfunc, &fd);
while (active) {
opal_event_loop(orte_event_base, OPAL_EVLOOP_ONCE);
}
active = true;
orte_dfs.get_file_size(fd, dfs_size_cbfunc, NULL);
while (active) {
opal_event_loop(orte_event_base, OPAL_EVLOOP_ONCE);
}
@ -101,14 +108,14 @@ int main(int argc, char* argv[])
rc = 0;
numread = 0;
while (read_active) {
fprintf(stderr, "reading next 100 bytes\n");
orte_dfs.read(fd, buffer, 100, read_cbfunc, NULL);
fprintf(stderr, "reading next %d bytes\n", READ_SIZE);
orte_dfs.read(fd, buffer, READ_SIZE, read_cbfunc, NULL);
while (active) {
opal_event_loop(orte_event_base, OPAL_EVLOOP_ONCE);
}
rc++;
if (2 == rc) {
orte_dfs.seek(fd, 326);
orte_dfs.seek(fd, 326, SEEK_SET);
}
active = true;
}