Provide for sync on seek and close DFS operations. Eliminate an unnecessary wake-up timer when using ORTE progress thread
This commit was SVN r27500.
Этот коммит содержится в:
родитель
dc93bb29ed
Коммит
4e52a15e70
@ -47,11 +47,15 @@ static int finalize(void);
|
||||
static void dfs_open(char *uri,
|
||||
orte_dfs_open_callback_fn_t cbfunc,
|
||||
void *cbdata);
|
||||
static void dfs_close(int fd);
|
||||
static void dfs_close(int fd,
|
||||
orte_dfs_close_callback_fn_t cbfunc,
|
||||
void *cbdata);
|
||||
static void dfs_get_file_size(int fd,
|
||||
orte_dfs_size_callback_fn_t cbfunc,
|
||||
void *cbdata);
|
||||
static void dfs_seek(int fd, size_t offset, int whence);
|
||||
static void dfs_seek(int fd, long offset, int whence,
|
||||
orte_dfs_seek_callback_fn_t cbfunc,
|
||||
void *cbdata);
|
||||
static void dfs_read(int fd, uint8_t *buffer,
|
||||
long length,
|
||||
orte_dfs_read_callback_fn_t cbfunc,
|
||||
@ -248,7 +252,7 @@ static void recv_dfs(int status, orte_process_name_t* sender,
|
||||
OBJ_RELEASE(dfs);
|
||||
return;
|
||||
}
|
||||
/* pass them back to the original caller */
|
||||
/* pass it back to the original caller */
|
||||
if (NULL != dfs->size_cbfunc) {
|
||||
dfs->size_cbfunc(i64, dfs->cbdata);
|
||||
}
|
||||
@ -256,6 +260,48 @@ static void recv_dfs(int status, orte_process_name_t* sender,
|
||||
OBJ_RELEASE(dfs);
|
||||
break;
|
||||
|
||||
case ORTE_DFS_SEEK_CMD:
|
||||
/* unpack the request id for this read */
|
||||
cnt = 1;
|
||||
if (OPAL_SUCCESS != (rc = opal_dss.unpack(buffer, &rid, &cnt, OPAL_UINT64))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
return;
|
||||
}
|
||||
/* search our list of requests to find the matching one */
|
||||
dfs = NULL;
|
||||
for (item = opal_list_get_first(&requests);
|
||||
item != opal_list_get_end(&requests);
|
||||
item = opal_list_get_next(item)) {
|
||||
dptr = (orte_dfs_request_t*)item;
|
||||
if (dptr->id == rid) {
|
||||
/* request was fulfilled, so remove it */
|
||||
opal_list_remove_item(&requests, item);
|
||||
dfs = dptr;
|
||||
break;
|
||||
}
|
||||
}
|
||||
if (NULL == dfs) {
|
||||
opal_output_verbose(1, orte_dfs_base.output,
|
||||
"%s recvd seek - no corresponding request found for local fd %d",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), local_fd);
|
||||
ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND);
|
||||
return;
|
||||
}
|
||||
/* get the returned offset/status */
|
||||
cnt = 1;
|
||||
if (OPAL_SUCCESS != (rc = opal_dss.unpack(buffer, &i64, &cnt, OPAL_INT64))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
OBJ_RELEASE(dfs);
|
||||
return;
|
||||
}
|
||||
/* pass it back to the original caller */
|
||||
if (NULL != dfs->seek_cbfunc) {
|
||||
dfs->seek_cbfunc(i64, dfs->cbdata);
|
||||
}
|
||||
/* release the request */
|
||||
OBJ_RELEASE(dfs);
|
||||
break;
|
||||
|
||||
case ORTE_DFS_READ_CMD:
|
||||
/* unpack the request id for this read */
|
||||
cnt = 1;
|
||||
@ -547,6 +593,9 @@ static void process_close(int fd, short args, void *cbdata)
|
||||
}
|
||||
if (NULL == trk) {
|
||||
ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND);
|
||||
if (NULL != close_dfs->close_cbfunc) {
|
||||
close_dfs->close_cbfunc(close_dfs->local_fd, close_dfs->cbdata);
|
||||
}
|
||||
OBJ_RELEASE(close_dfs);
|
||||
return;
|
||||
}
|
||||
@ -587,10 +636,15 @@ static void process_close(int fd, short args, void *cbdata)
|
||||
complete:
|
||||
opal_list_remove_item(&active_files, &trk->super);
|
||||
OBJ_RELEASE(trk);
|
||||
if (NULL != close_dfs->close_cbfunc) {
|
||||
close_dfs->close_cbfunc(close_dfs->local_fd, close_dfs->cbdata);
|
||||
}
|
||||
OBJ_RELEASE(close_dfs);
|
||||
}
|
||||
|
||||
static void dfs_close(int fd)
|
||||
static void dfs_close(int fd,
|
||||
orte_dfs_close_callback_fn_t cbfunc,
|
||||
void *cbdata)
|
||||
{
|
||||
orte_dfs_request_t *dfs;
|
||||
|
||||
@ -601,6 +655,8 @@ static void dfs_close(int fd)
|
||||
dfs = OBJ_NEW(orte_dfs_request_t);
|
||||
dfs->cmd = ORTE_DFS_CLOSE_CMD;
|
||||
dfs->local_fd = fd;
|
||||
dfs->close_cbfunc = cbfunc;
|
||||
dfs->cbdata = cbdata;
|
||||
|
||||
/* post it for processing */
|
||||
ORTE_DFS_POST_REQUEST(dfs, process_close);
|
||||
@ -738,6 +794,7 @@ static void process_seeks(int fd, short args, void *cbdata)
|
||||
opal_buffer_t *buffer;
|
||||
int64_t i64;
|
||||
int rc;
|
||||
struct stat buf;
|
||||
|
||||
opal_output_verbose(1, orte_dfs_base.output,
|
||||
"%s processing seek on fd %d",
|
||||
@ -769,9 +826,46 @@ static void process_seeks(int fd, short args, void *cbdata)
|
||||
"%s local seek on fd %d",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
||||
seek_dfs->local_fd);
|
||||
lseek(trk->remote_fd, seek_dfs->read_length, seek_dfs->remote_fd);
|
||||
/* stat the file and get its size */
|
||||
if (0 > stat(trk->filename, &buf)) {
|
||||
/* cannot stat file */
|
||||
opal_output_verbose(1, orte_dfs_base.output,
|
||||
"%s could not stat %s",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
||||
trk->filename);
|
||||
if (NULL != seek_dfs->seek_cbfunc) {
|
||||
seek_dfs->seek_cbfunc(-1, seek_dfs->cbdata);
|
||||
}
|
||||
} else if (buf.st_size < seek_dfs->read_length &&
|
||||
SEEK_SET == seek_dfs->remote_fd) {
|
||||
/* seek would take us past EOF */
|
||||
if (NULL != seek_dfs->seek_cbfunc) {
|
||||
seek_dfs->seek_cbfunc(-1, seek_dfs->cbdata);
|
||||
}
|
||||
} else if (buf.st_size < (off_t)(trk->location + seek_dfs->read_length) &&
|
||||
SEEK_CUR == seek_dfs->remote_fd) {
|
||||
/* seek would take us past EOF */
|
||||
if (NULL != seek_dfs->seek_cbfunc) {
|
||||
seek_dfs->seek_cbfunc(-1, seek_dfs->cbdata);
|
||||
}
|
||||
} else {
|
||||
lseek(trk->remote_fd, seek_dfs->read_length, seek_dfs->remote_fd);
|
||||
if (SEEK_SET == seek_dfs->remote_fd) {
|
||||
trk->location = seek_dfs->read_length;
|
||||
} else {
|
||||
trk->location += seek_dfs->read_length;
|
||||
}
|
||||
if (NULL != seek_dfs->seek_cbfunc) {
|
||||
seek_dfs->seek_cbfunc(seek_dfs->read_length, seek_dfs->cbdata);
|
||||
}
|
||||
}
|
||||
goto complete;
|
||||
}
|
||||
/* add this request to our local list so we can
|
||||
* match it with the returned response when it comes
|
||||
*/
|
||||
seek_dfs->id = req_id++;
|
||||
opal_list_append(&requests, &seek_dfs->super);
|
||||
|
||||
/* setup a message for the daemon telling
|
||||
* them what file to seek
|
||||
@ -781,6 +875,12 @@ static void process_seeks(int fd, short args, void *cbdata)
|
||||
ORTE_ERROR_LOG(rc);
|
||||
goto complete;
|
||||
}
|
||||
/* pass the request id */
|
||||
if (OPAL_SUCCESS != (rc = opal_dss.pack(buffer, &seek_dfs->id, 1, OPAL_UINT64))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
opal_list_remove_item(&requests, &seek_dfs->super);
|
||||
goto complete;
|
||||
}
|
||||
if (OPAL_SUCCESS != (rc = opal_dss.pack(buffer, &trk->remote_fd, 1, OPAL_INT))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
goto complete;
|
||||
@ -808,13 +908,17 @@ static void process_seeks(int fd, short args, void *cbdata)
|
||||
OBJ_RELEASE(buffer);
|
||||
goto complete;
|
||||
}
|
||||
/* leave the request */
|
||||
return;
|
||||
|
||||
complete:
|
||||
OBJ_RELEASE(seek_dfs);
|
||||
}
|
||||
|
||||
|
||||
static void dfs_seek(int fd, size_t offset, int whence)
|
||||
static void dfs_seek(int fd, long offset, int whence,
|
||||
orte_dfs_seek_callback_fn_t cbfunc,
|
||||
void *cbdata)
|
||||
{
|
||||
orte_dfs_request_t *dfs;
|
||||
|
||||
@ -827,6 +931,8 @@ static void dfs_seek(int fd, size_t offset, int whence)
|
||||
dfs->local_fd = fd;
|
||||
dfs->read_length = offset;
|
||||
dfs->remote_fd = whence;
|
||||
dfs->seek_cbfunc = cbfunc;
|
||||
dfs->cbdata = cbdata;
|
||||
|
||||
/* post it for processing */
|
||||
ORTE_DFS_POST_REQUEST(dfs, process_seeks);
|
||||
@ -862,6 +968,10 @@ static void process_reads(int fd, short args, void *cbdata)
|
||||
/* if the file is local, read the desired bytes */
|
||||
if (trk->host_daemon.vpid == ORTE_PROC_MY_DAEMON->vpid) {
|
||||
nbytes = read(trk->remote_fd, read_dfs->read_buffer, read_dfs->read_length);
|
||||
if (0 < nbytes) {
|
||||
/* update our location */
|
||||
trk->location += nbytes;
|
||||
}
|
||||
/* pass them back to the caller */
|
||||
if (NULL != read_dfs->read_cbfunc) {
|
||||
read_dfs->read_cbfunc(nbytes, read_dfs->read_buffer, read_dfs->cbdata);
|
||||
|
@ -52,6 +52,7 @@ typedef struct {
|
||||
char *filename; /* for debug purposes */
|
||||
int local_fd;
|
||||
int remote_fd;
|
||||
size_t location;
|
||||
} orte_dfs_tracker_t;
|
||||
OBJ_CLASS_DECLARATION(orte_dfs_tracker_t);
|
||||
|
||||
@ -66,9 +67,11 @@ typedef struct {
|
||||
int remote_fd;
|
||||
uint8_t *read_buffer;
|
||||
long read_length;
|
||||
orte_dfs_open_callback_fn_t open_cbfunc;
|
||||
orte_dfs_size_callback_fn_t size_cbfunc;
|
||||
orte_dfs_read_callback_fn_t read_cbfunc;
|
||||
orte_dfs_open_callback_fn_t open_cbfunc;
|
||||
orte_dfs_close_callback_fn_t close_cbfunc;
|
||||
orte_dfs_size_callback_fn_t size_cbfunc;
|
||||
orte_dfs_seek_callback_fn_t seek_cbfunc;
|
||||
orte_dfs_read_callback_fn_t read_cbfunc;
|
||||
void *cbdata;
|
||||
} orte_dfs_request_t;
|
||||
OBJ_CLASS_DECLARATION(orte_dfs_request_t);
|
||||
|
@ -81,6 +81,7 @@ static void trk_con(orte_dfs_tracker_t *trk)
|
||||
trk->host_daemon.jobid = ORTE_JOBID_INVALID;
|
||||
trk->host_daemon.vpid = ORTE_VPID_INVALID;
|
||||
trk->filename = NULL;
|
||||
trk->location = 0;
|
||||
}
|
||||
static void trk_des(orte_dfs_tracker_t *trk)
|
||||
{
|
||||
@ -100,6 +101,9 @@ static void req_const(orte_dfs_request_t *dfs)
|
||||
dfs->read_length = -1;
|
||||
dfs->read_buffer = NULL;
|
||||
dfs->open_cbfunc = NULL;
|
||||
dfs->close_cbfunc = NULL;
|
||||
dfs->size_cbfunc = NULL;
|
||||
dfs->seek_cbfunc = NULL;
|
||||
dfs->read_cbfunc = NULL;
|
||||
dfs->cbdata = NULL;
|
||||
}
|
||||
|
@ -60,11 +60,11 @@ typedef void (*orte_dfs_base_module_open_fn_t)(char *uri,
|
||||
|
||||
/* Close a file
|
||||
*
|
||||
* Closes and invalidates the file descriptor. Note that this doesn't
|
||||
* imply closure of the remote file descriptor as other processes
|
||||
* may also be accessing it
|
||||
* Closes and invalidates the file descriptor
|
||||
*/
|
||||
typedef void (*orte_dfs_base_module_close_fn_t)(int fd);
|
||||
typedef void (*orte_dfs_base_module_close_fn_t)(int fd,
|
||||
orte_dfs_close_callback_fn_t cbfunc,
|
||||
void *cbdata);
|
||||
|
||||
/* Get the size of a file
|
||||
*
|
||||
@ -79,8 +79,15 @@ typedef void (*orte_dfs_base_module_get_file_size_fn_t)(int fd,
|
||||
* relative to the location specified by whence:
|
||||
* SEEK_SET => from beginning of file
|
||||
* SEEK_CUR => from current location
|
||||
*
|
||||
* The callback will return the offset, or a negative value if
|
||||
* the requested seek would take the pointer past the end of the
|
||||
* file. This is contrary to standard lseek behavior, but is consistent
|
||||
* with the read-only nature of this framework
|
||||
*/
|
||||
typedef void (*orte_dfs_base_module_seek_fn_t)(int fd, size_t offset, int whence);
|
||||
typedef void (*orte_dfs_base_module_seek_fn_t)(int fd, long offset, int whence,
|
||||
orte_dfs_seek_callback_fn_t cbfunc,
|
||||
void *cbdata);
|
||||
|
||||
/* Read bytes from a possibly remote file
|
||||
*
|
||||
|
@ -26,8 +26,12 @@ typedef uint8_t orte_dfs_cmd_t;
|
||||
|
||||
typedef void (*orte_dfs_open_callback_fn_t)(int fd, void *cbdata);
|
||||
|
||||
typedef void (*orte_dfs_close_callback_fn_t)(int fd, void *cbdata);
|
||||
|
||||
typedef void (*orte_dfs_size_callback_fn_t)(long size, void *cbdata);
|
||||
|
||||
typedef void (*orte_dfs_seek_callback_fn_t)(long offset, void *cbdata);
|
||||
|
||||
typedef void (*orte_dfs_read_callback_fn_t)(long status,
|
||||
uint8_t *buffer,
|
||||
void *cbdata);
|
||||
|
@ -54,11 +54,15 @@ static int finalize(void);
|
||||
static void dfs_open(char *uri,
|
||||
orte_dfs_open_callback_fn_t cbfunc,
|
||||
void *cbdata);
|
||||
static void dfs_close(int fd);
|
||||
static void dfs_close(int fd,
|
||||
orte_dfs_close_callback_fn_t cbfunc,
|
||||
void *cbdata);
|
||||
static void dfs_get_file_size(int fd,
|
||||
orte_dfs_size_callback_fn_t cbfunc,
|
||||
void *cbdata);
|
||||
static void dfs_seek(int fd, size_t offset, int whence);
|
||||
static void dfs_seek(int fd, long offset, int whence,
|
||||
orte_dfs_seek_callback_fn_t cbfunc,
|
||||
void *cbdata);
|
||||
static void dfs_read(int fd, uint8_t *buffer,
|
||||
long length,
|
||||
orte_dfs_read_callback_fn_t cbfunc,
|
||||
@ -361,6 +365,9 @@ static void process_close(int fd, short args, void *cbdata)
|
||||
}
|
||||
if (NULL == trk) {
|
||||
ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND);
|
||||
if (NULL != close_dfs->close_cbfunc) {
|
||||
close_dfs->close_cbfunc(close_dfs->local_fd, close_dfs->cbdata);
|
||||
}
|
||||
OBJ_RELEASE(close_dfs);
|
||||
return;
|
||||
}
|
||||
@ -401,10 +408,15 @@ static void process_close(int fd, short args, void *cbdata)
|
||||
complete:
|
||||
opal_list_remove_item(&active_files, &trk->super);
|
||||
OBJ_RELEASE(trk);
|
||||
if (NULL != close_dfs->close_cbfunc) {
|
||||
close_dfs->close_cbfunc(close_dfs->local_fd, close_dfs->cbdata);
|
||||
}
|
||||
OBJ_RELEASE(close_dfs);
|
||||
}
|
||||
|
||||
static void dfs_close(int fd)
|
||||
static void dfs_close(int fd,
|
||||
orte_dfs_close_callback_fn_t cbfunc,
|
||||
void *cbdata)
|
||||
{
|
||||
orte_dfs_request_t *dfs;
|
||||
|
||||
@ -415,6 +427,8 @@ static void dfs_close(int fd)
|
||||
dfs = OBJ_NEW(orte_dfs_request_t);
|
||||
dfs->cmd = ORTE_DFS_CLOSE_CMD;
|
||||
dfs->local_fd = fd;
|
||||
dfs->close_cbfunc = cbfunc;
|
||||
dfs->cbdata = cbdata;
|
||||
|
||||
/* post it for processing */
|
||||
ORTE_DFS_POST_REQUEST(dfs, process_close);
|
||||
@ -532,6 +546,7 @@ static void process_seeks(int fd, short args, void *cbdata)
|
||||
opal_buffer_t *buffer;
|
||||
int64_t i64;
|
||||
int rc;
|
||||
struct stat buf;
|
||||
|
||||
opal_output_verbose(1, orte_dfs_base.output,
|
||||
"%s processing seek on fd %d",
|
||||
@ -563,9 +578,43 @@ static void process_seeks(int fd, short args, void *cbdata)
|
||||
"%s local seek on fd %d",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
||||
seek_dfs->local_fd);
|
||||
lseek(trk->remote_fd, seek_dfs->read_length, seek_dfs->remote_fd);
|
||||
/* stat the file and get its size */
|
||||
if (0 > stat(trk->filename, &buf)) {
|
||||
/* cannot stat file */
|
||||
opal_output_verbose(1, orte_dfs_base.output,
|
||||
"%s could not stat %s",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
||||
trk->filename);
|
||||
if (NULL != seek_dfs->seek_cbfunc) {
|
||||
seek_dfs->seek_cbfunc(-1, seek_dfs->cbdata);
|
||||
}
|
||||
} else if (buf.st_size < seek_dfs->read_length &&
|
||||
SEEK_SET == seek_dfs->remote_fd) {
|
||||
/* seek would take us past EOF */
|
||||
if (NULL != seek_dfs->seek_cbfunc) {
|
||||
seek_dfs->seek_cbfunc(-1, seek_dfs->cbdata);
|
||||
}
|
||||
} else if (buf.st_size < (off_t)(trk->location + seek_dfs->read_length) &&
|
||||
SEEK_CUR == seek_dfs->remote_fd) {
|
||||
/* seek would take us past EOF */
|
||||
if (NULL != seek_dfs->seek_cbfunc) {
|
||||
seek_dfs->seek_cbfunc(-1, seek_dfs->cbdata);
|
||||
}
|
||||
} else {
|
||||
lseek(trk->remote_fd, seek_dfs->read_length, seek_dfs->remote_fd);
|
||||
if (SEEK_SET == seek_dfs->remote_fd) {
|
||||
trk->location = seek_dfs->read_length;
|
||||
} else {
|
||||
trk->location += seek_dfs->read_length;
|
||||
}
|
||||
}
|
||||
goto complete;
|
||||
}
|
||||
/* add this request to our local list so we can
|
||||
* match it with the returned response when it comes
|
||||
*/
|
||||
seek_dfs->id = req_id++;
|
||||
opal_list_append(&requests, &seek_dfs->super);
|
||||
|
||||
/* setup a message for the daemon telling
|
||||
* them what file to seek
|
||||
@ -575,6 +624,12 @@ static void process_seeks(int fd, short args, void *cbdata)
|
||||
ORTE_ERROR_LOG(rc);
|
||||
goto complete;
|
||||
}
|
||||
/* pass the request id */
|
||||
if (OPAL_SUCCESS != (rc = opal_dss.pack(buffer, &seek_dfs->id, 1, OPAL_UINT64))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
opal_list_remove_item(&requests, &seek_dfs->super);
|
||||
goto complete;
|
||||
}
|
||||
if (OPAL_SUCCESS != (rc = opal_dss.pack(buffer, &trk->remote_fd, 1, OPAL_INT))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
goto complete;
|
||||
@ -608,7 +663,9 @@ static void process_seeks(int fd, short args, void *cbdata)
|
||||
}
|
||||
|
||||
|
||||
static void dfs_seek(int fd, size_t offset, int whence)
|
||||
static void dfs_seek(int fd, long offset, int whence,
|
||||
orte_dfs_seek_callback_fn_t cbfunc,
|
||||
void *cbdata)
|
||||
{
|
||||
orte_dfs_request_t *dfs;
|
||||
|
||||
@ -621,6 +678,8 @@ static void dfs_seek(int fd, size_t offset, int whence)
|
||||
dfs->local_fd = fd;
|
||||
dfs->read_length = offset;
|
||||
dfs->remote_fd = whence;
|
||||
dfs->seek_cbfunc = cbfunc;
|
||||
dfs->cbdata = cbdata;
|
||||
|
||||
/* post it for processing */
|
||||
ORTE_DFS_POST_REQUEST(dfs, process_seeks);
|
||||
@ -656,6 +715,10 @@ static void process_reads(int fd, short args, void *cbdata)
|
||||
/* if the file is local, read the desired bytes */
|
||||
if (trk->host_daemon.vpid == ORTE_PROC_MY_DAEMON->vpid) {
|
||||
nbytes = read(trk->remote_fd, read_dfs->read_buffer, read_dfs->read_length);
|
||||
if (0 < nbytes) {
|
||||
/* update our location */
|
||||
trk->location += nbytes;
|
||||
}
|
||||
/* pass them back to the caller */
|
||||
if (NULL != read_dfs->read_cbfunc) {
|
||||
read_dfs->read_cbfunc(nbytes, read_dfs->read_buffer, read_dfs->cbdata);
|
||||
@ -828,13 +891,13 @@ static void recv_dfs_cmd(int status, orte_process_name_t* sender,
|
||||
return;
|
||||
}
|
||||
/* find the corresponding tracker */
|
||||
for (item = opal_list_get_first(&requests);
|
||||
item != opal_list_get_end(&requests);
|
||||
for (item = opal_list_get_first(&active_files);
|
||||
item != opal_list_get_end(&active_files);
|
||||
item = opal_list_get_next(item)) {
|
||||
trk = (orte_dfs_tracker_t*)item;
|
||||
if (my_fd == trk->local_fd) {
|
||||
/* remove it */
|
||||
opal_list_remove_item(&requests, item);
|
||||
opal_list_remove_item(&active_files, item);
|
||||
OBJ_RELEASE(item);
|
||||
/* close the file */
|
||||
close(my_fd);
|
||||
@ -858,8 +921,8 @@ static void recv_dfs_cmd(int status, orte_process_name_t* sender,
|
||||
}
|
||||
/* find the corresponding tracker */
|
||||
i64 = -1;
|
||||
for (item = opal_list_get_first(&requests);
|
||||
item != opal_list_get_end(&requests);
|
||||
for (item = opal_list_get_first(&active_files);
|
||||
item != opal_list_get_end(&active_files);
|
||||
item = opal_list_get_next(item)) {
|
||||
trk = (orte_dfs_tracker_t*)item;
|
||||
if (my_fd == trk->local_fd) {
|
||||
@ -901,6 +964,12 @@ static void recv_dfs_cmd(int status, orte_process_name_t* sender,
|
||||
break;
|
||||
|
||||
case ORTE_DFS_SEEK_CMD:
|
||||
/* unpack their request id */
|
||||
cnt = 1;
|
||||
if (OPAL_SUCCESS != (rc = opal_dss.unpack(buffer, &rid, &cnt, OPAL_UINT64))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
return;
|
||||
}
|
||||
/* unpack our fd */
|
||||
cnt = 1;
|
||||
if (OPAL_SUCCESS != (rc = opal_dss.unpack(buffer, &my_fd, &cnt, OPAL_INT))) {
|
||||
@ -919,19 +988,78 @@ static void recv_dfs_cmd(int status, orte_process_name_t* sender,
|
||||
ORTE_ERROR_LOG(rc);
|
||||
return;
|
||||
}
|
||||
/* set default error */
|
||||
bytes_read = -1;
|
||||
/* find the corresponding tracker - we do this to ensure
|
||||
* that the local fd we were sent is actually open
|
||||
*/
|
||||
for (item = opal_list_get_first(&requests);
|
||||
item != opal_list_get_end(&requests);
|
||||
for (item = opal_list_get_first(&active_files);
|
||||
item != opal_list_get_end(&active_files);
|
||||
item = opal_list_get_next(item)) {
|
||||
trk = (orte_dfs_tracker_t*)item;
|
||||
if (my_fd == trk->local_fd) {
|
||||
/* do the seek */
|
||||
lseek(my_fd, (long)i64, whence);
|
||||
/* stat the file and get its size */
|
||||
if (0 > stat(trk->filename, &buf)) {
|
||||
/* cannot stat file */
|
||||
opal_output_verbose(1, orte_dfs_base.output,
|
||||
"%s seek could not stat %s",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
||||
trk->filename);
|
||||
} else if (buf.st_size < i64 && SEEK_SET == whence) {
|
||||
/* seek would take us past EOF */
|
||||
opal_output_verbose(1, orte_dfs_base.output,
|
||||
"%s seek SET past EOF on file %s",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
||||
trk->filename);
|
||||
bytes_read = -2;
|
||||
} else if (buf.st_size < (off_t)(trk->location + i64) &&
|
||||
SEEK_CUR == whence) {
|
||||
/* seek would take us past EOF */
|
||||
opal_output_verbose(1, orte_dfs_base.output,
|
||||
"%s seek CUR past EOF on file %s",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
||||
trk->filename);
|
||||
bytes_read = -3;
|
||||
} else {
|
||||
lseek(my_fd, i64, whence);
|
||||
if (SEEK_SET == whence) {
|
||||
trk->location = i64;
|
||||
} else {
|
||||
trk->location += i64;
|
||||
}
|
||||
bytes_read = i64;
|
||||
}
|
||||
break;
|
||||
}
|
||||
}
|
||||
/* construct the return message */
|
||||
buffer = OBJ_NEW(opal_buffer_t);
|
||||
if (OPAL_SUCCESS != (rc = opal_dss.pack(buffer, &cmd, 1, ORTE_DFS_CMD_T))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
return;
|
||||
}
|
||||
if (OPAL_SUCCESS != (rc = opal_dss.pack(buffer, &rid, 1, OPAL_UINT64))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
return;
|
||||
}
|
||||
/* return the offset/status */
|
||||
if (OPAL_SUCCESS != (rc = opal_dss.pack(buffer, &bytes_read, 1, OPAL_INT64))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
return;
|
||||
}
|
||||
/* send it */
|
||||
opal_output_verbose(1, orte_dfs_base.output,
|
||||
"%s sending %ld offset back to %s",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
||||
(long)bytes_read,
|
||||
ORTE_NAME_PRINT(sender));
|
||||
if (0 > (rc = orte_rml.send_buffer_nb(sender, buffer,
|
||||
ORTE_RML_TAG_DFS_DATA, 0,
|
||||
orte_rml_send_callback, NULL))) {
|
||||
ORTE_ERROR_LOG(rc);
|
||||
OBJ_RELEASE(buffer);
|
||||
return;
|
||||
}
|
||||
break;
|
||||
|
||||
case ORTE_DFS_READ_CMD:
|
||||
@ -976,6 +1104,10 @@ static void recv_dfs_cmd(int status, orte_process_name_t* sender,
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME));
|
||||
read_buf = (uint8_t*)malloc(i64);
|
||||
bytes_read = read(my_fd, read_buf, (long)i64);
|
||||
if (0 < bytes_read) {
|
||||
/* update our location */
|
||||
trk->location += bytes_read;
|
||||
}
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
@ -160,6 +160,7 @@ int orte_init(int* pargc, char*** pargv, orte_proc_type_t flags)
|
||||
*/
|
||||
opal_event_set(orte_event_base, &orte_finalize_event, -1, OPAL_EV_WRITE, ignore_callback, NULL);
|
||||
opal_event_set_priority(&orte_finalize_event, ORTE_ERROR_PRI);
|
||||
#if 0
|
||||
{
|
||||
/* seems strange, but wake us up once a second just so we can check for new events */
|
||||
opal_event_t *ev;
|
||||
@ -170,6 +171,7 @@ int orte_init(int* pargc, char*** pargv, orte_proc_type_t flags)
|
||||
opal_event_set_priority(ev, ORTE_INFO_PRI);
|
||||
opal_event_evtimer_add(ev, &tv);
|
||||
}
|
||||
#endif
|
||||
/* construct the thread object */
|
||||
OBJ_CONSTRUCT(&orte_progress_thread, opal_thread_t);
|
||||
/* fork off a thread to progress it */
|
||||
|
@ -16,6 +16,7 @@
|
||||
#include "orte/mca/errmgr/errmgr.h"
|
||||
#include "orte/runtime/orte_globals.h"
|
||||
#include "orte/runtime/runtime.h"
|
||||
#include "orte/runtime/orte_wait.h"
|
||||
|
||||
#include "orte/mca/dfs/dfs.h"
|
||||
|
||||
@ -23,7 +24,8 @@ static bool active;
|
||||
static bool read_active;
|
||||
static int numread = 0;
|
||||
|
||||
#define READ_SIZE 1000
|
||||
#define READ_SIZE 500
|
||||
#define OFFSET_VALUE 313
|
||||
|
||||
static void dfs_open_cbfunc(int fd, void *cbdata)
|
||||
{
|
||||
@ -35,6 +37,12 @@ static void dfs_open_cbfunc(int fd, void *cbdata)
|
||||
|
||||
}
|
||||
|
||||
static void dfs_close_cbfunc(int fd, void *cbdata)
|
||||
{
|
||||
opal_output(0, "CLOSE CONFIRMED");
|
||||
active = false;
|
||||
}
|
||||
|
||||
static void dfs_size_cbfunc(long size, void *cbdata)
|
||||
{
|
||||
opal_output(0, "GOT FILE SIZE %ld", size);
|
||||
@ -42,6 +50,16 @@ static void dfs_size_cbfunc(long size, void *cbdata)
|
||||
|
||||
}
|
||||
|
||||
static void dfs_seek_cbfunc(long offset, void *cbdata)
|
||||
{
|
||||
opal_output(0, "GOT FILE OFFSET %ld vs %d", offset, OFFSET_VALUE);
|
||||
active = false;
|
||||
if (offset != OFFSET_VALUE) {
|
||||
exit(1);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
static void read_cbfunc(long status, uint8_t *buffer, void *cbdata)
|
||||
{
|
||||
opal_output(0, "GOT READ STATUS %d", (int)status);
|
||||
@ -93,35 +111,39 @@ int main(int argc, char* argv[])
|
||||
|
||||
active = true;
|
||||
orte_dfs.open(uri, dfs_open_cbfunc, &fd);
|
||||
while (active) {
|
||||
opal_event_loop(orte_event_base, OPAL_EVLOOP_ONCE);
|
||||
}
|
||||
ORTE_WAIT_FOR_COMPLETION(active);
|
||||
|
||||
active = true;
|
||||
orte_dfs.get_file_size(fd, dfs_size_cbfunc, NULL);
|
||||
while (active) {
|
||||
opal_event_loop(orte_event_base, OPAL_EVLOOP_ONCE);
|
||||
}
|
||||
ORTE_WAIT_FOR_COMPLETION(active);
|
||||
|
||||
active = true;
|
||||
read_active = true;
|
||||
rc = 0;
|
||||
numread = 0;
|
||||
while (read_active) {
|
||||
fprintf(stderr, "reading next %d bytes\n", READ_SIZE);
|
||||
opal_output(0, "reading next %d bytes\n", READ_SIZE);
|
||||
orte_dfs.read(fd, buffer, READ_SIZE, read_cbfunc, NULL);
|
||||
while (active) {
|
||||
opal_event_loop(orte_event_base, OPAL_EVLOOP_ONCE);
|
||||
}
|
||||
ORTE_WAIT_FOR_COMPLETION(active);
|
||||
rc++;
|
||||
if (2 == rc) {
|
||||
orte_dfs.seek(fd, 326, SEEK_SET);
|
||||
active = true;
|
||||
opal_output(0, "execute absolute seek of %d bytes\n", OFFSET_VALUE);
|
||||
orte_dfs.seek(fd, OFFSET_VALUE, SEEK_SET, dfs_seek_cbfunc, NULL);
|
||||
ORTE_WAIT_FOR_COMPLETION(active);
|
||||
}
|
||||
if (5 == rc) {
|
||||
active = true;
|
||||
opal_output(0, "execute relative seek of %d bytes\n", OFFSET_VALUE);
|
||||
orte_dfs.seek(fd, OFFSET_VALUE, SEEK_CUR, dfs_seek_cbfunc, NULL);
|
||||
ORTE_WAIT_FOR_COMPLETION(active);
|
||||
}
|
||||
active = true;
|
||||
}
|
||||
|
||||
orte_dfs.close(fd);
|
||||
opal_event_loop(orte_event_base, OPAL_EVLOOP_ONCE);
|
||||
active= true;
|
||||
orte_dfs.close(fd, dfs_close_cbfunc, NULL);
|
||||
ORTE_WAIT_FOR_COMPLETION(active);
|
||||
|
||||
orte_finalize();
|
||||
return 0;
|
||||
|
Загрузка…
x
Ссылка в новой задаче
Block a user