diff --git a/orte/mca/dfs/orted/dfs_orted.c b/orte/mca/dfs/orted/dfs_orted.c index 0782123bf5..4de581b4db 100644 --- a/orte/mca/dfs/orted/dfs_orted.c +++ b/orte/mca/dfs/orted/dfs_orted.c @@ -97,7 +97,85 @@ orte_dfs_base_module_t orte_dfs_orted_module = { dfs_purge_file_maps }; +static void finalize_thread(int fd, short args, void *cbdata) +{ + /* nothing to do here - we just need it to + * kick us out of the event_loop + */ +} + +static void* worker_thread_engine(opal_object_t *obj); + +typedef struct { + opal_object_t super; + int idx; + opal_event_base_t *event_base; + opal_event_t fin_ev; + bool active; + opal_thread_t thread; +} worker_thread_t; +static void wt_const(worker_thread_t *ptr) +{ + /* create an event base for this thread */ + ptr->event_base = opal_event_base_create(); + /* setup an event to finalize it */ + opal_event_set(ptr->event_base, &ptr->fin_ev, -1, OPAL_EV_WRITE, finalize_thread, NULL); + /* construct the thread object */ + OBJ_CONSTRUCT(&ptr->thread, opal_thread_t); + /* fork off a thread to progress it */ + ptr->active = true; + ptr->thread.t_run = worker_thread_engine; + ptr->thread.t_arg = ptr; + opal_thread_start(&ptr->thread); +} +static void wt_dest(worker_thread_t *ptr) +{ + /* stop the thread */ + ptr->active = false; + /* trigger the finalize event */ + opal_event_active(&ptr->fin_ev, OPAL_EV_WRITE, 1); + /* wait for thread to exit */ + opal_thread_join(&ptr->thread, NULL); + OBJ_DESTRUCT(&ptr->thread); + /* release the event base */ + opal_event_base_free(ptr->event_base); +} +OBJ_CLASS_INSTANCE(worker_thread_t, + opal_object_t, + wt_const, wt_dest); + +typedef struct { + opal_object_t super; + opal_event_t ev; + uint64_t rid; + orte_dfs_tracker_t *trk; + int64_t nbytes; + int whence; +} worker_req_t; +OBJ_CLASS_INSTANCE(worker_req_t, + opal_object_t, + NULL, NULL); +#define ORTE_DFS_POST_WORKER(r, cb) \ + do { \ + worker_thread_t *wt; \ + wt = (worker_thread_t*)opal_pointer_array_get_item(&worker_threads, wt_cntr); \ + opal_output_verbose(1, orte_dfs_base.output, \ + "%s assigning req to worker thread %d", \ + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), \ + wt->idx); \ + opal_event_set(wt->event_base, &((r)->ev), \ + -1, OPAL_EV_WRITE, (cb), (r)); \ + opal_event_active(&((r)->ev), OPAL_EV_WRITE, 1); \ + /* move to the next thread */ \ + wt_cntr++; \ + if (wt_cntr == orte_dfs_orted_num_worker_threads) { \ + wt_cntr = 0; \ + } \ + } while(0); + static opal_list_t requests, active_files, file_maps; +static opal_pointer_array_t worker_threads; +static int wt_cntr = 0; static int local_fd = 0; static uint64_t req_id = 0; static void recv_dfs_cmd(int status, orte_process_name_t* sender, @@ -106,20 +184,29 @@ static void recv_dfs_cmd(int status, orte_process_name_t* sender, static void recv_dfs_data(int status, orte_process_name_t* sender, opal_buffer_t* buffer, orte_rml_tag_t tag, void* cbdata); +static void remote_read(int fd, short args, void *cbata); +static void remote_open(int fd, short args, void *cbdata); +static void remote_size(int fd, short args, void *cbdata); +static void remote_seek(int fd, short args, void *cbdata); static int init(void) { int rc; + int i; + worker_thread_t *wt; OBJ_CONSTRUCT(&requests, opal_list_t); OBJ_CONSTRUCT(&active_files, opal_list_t); OBJ_CONSTRUCT(&file_maps, opal_list_t); + OBJ_CONSTRUCT(&worker_threads, opal_pointer_array_t); + opal_pointer_array_init(&worker_threads, 1, INT_MAX, 1); if (ORTE_SUCCESS != (rc = orte_rml.recv_buffer_nb(ORTE_NAME_WILDCARD, ORTE_RML_TAG_DFS_CMD, ORTE_RML_PERSISTENT, recv_dfs_cmd, NULL))) { ORTE_ERROR_LOG(rc); + return rc; } if (ORTE_SUCCESS != (rc = orte_rml.recv_buffer_nb(ORTE_NAME_WILDCARD, ORTE_RML_TAG_DFS_DATA, @@ -127,13 +214,27 @@ static int init(void) recv_dfs_data, NULL))) { ORTE_ERROR_LOG(rc); + return rc; } + + opal_output_verbose(1, orte_dfs_base.output, + "%s starting %d worker threads", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), + orte_dfs_orted_num_worker_threads); + for (i=0; i < orte_dfs_orted_num_worker_threads; i++) { + wt = OBJ_NEW(worker_thread_t); + wt->idx = i; + opal_pointer_array_add(&worker_threads, wt); + } + return rc; } static int finalize(void) { opal_list_item_t *item; + int i; + worker_thread_t *wt; orte_rml.recv_cancel(ORTE_NAME_WILDCARD, ORTE_RML_TAG_DFS_CMD); orte_rml.recv_cancel(ORTE_NAME_WILDCARD, ORTE_RML_TAG_DFS_DATA); @@ -149,6 +250,12 @@ static int finalize(void) OBJ_RELEASE(item); } OBJ_DESTRUCT(&file_maps); + for (i=0; i < worker_threads.size; i++) { + if (NULL != (wt = (worker_thread_t*)opal_pointer_array_get_item(&worker_threads, i))) { + OBJ_RELEASE(wt); + } + } + OBJ_DESTRUCT(&worker_threads); return ORTE_SUCCESS; } @@ -1208,6 +1315,7 @@ static void recv_dfs_cmd(int status, orte_process_name_t* sender, int i, j; orte_vpid_t vpid; int32_t nentries, ncontributors; + worker_req_t *wrkr; /* unpack the command */ cnt = 1; @@ -1235,7 +1343,23 @@ static void recv_dfs_cmd(int status, orte_process_name_t* sender, ORTE_ERROR_LOG(rc); return; } - /* attempt to open the file */ + /* create a tracker for this file */ + trk = OBJ_NEW(orte_dfs_tracker_t); + trk->requestor.jobid = sender->jobid; + trk->requestor.vpid = sender->vpid; + trk->host_daemon.jobid = ORTE_PROC_MY_NAME->jobid; + trk->host_daemon.vpid = ORTE_PROC_MY_NAME->vpid; + trk->filename = strdup(filename); + opal_list_append(&active_files, &trk->super); + /* process the request */ + if (0 < orte_dfs_orted_num_worker_threads) { + wrkr = OBJ_NEW(worker_req_t); + wrkr->trk = trk; + wrkr->rid = rid; + ORTE_DFS_POST_WORKER(wrkr, remote_open); + return; + } + /* no worker threads, so attempt to open the file */ opal_output_verbose(1, orte_dfs_base.output, "%s opening file %s", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), @@ -1244,15 +1368,7 @@ static void recv_dfs_cmd(int status, orte_process_name_t* sender, ORTE_ERROR_LOG(ORTE_ERR_FILE_OPEN_FAILURE); goto answer_open; } - /* create a tracker for this file */ - trk = OBJ_NEW(orte_dfs_tracker_t); - trk->requestor.jobid = sender->jobid; - trk->requestor.vpid = sender->vpid; - trk->host_daemon.jobid = ORTE_PROC_MY_NAME->jobid; - trk->host_daemon.vpid = ORTE_PROC_MY_NAME->vpid; - trk->filename = strdup(filename); trk->local_fd = my_fd; - opal_list_append(&active_files, &trk->super); answer_open: /* construct the return message */ answer = OBJ_NEW(opal_buffer_t); @@ -1321,7 +1437,15 @@ static void recv_dfs_cmd(int status, orte_process_name_t* sender, item = opal_list_get_next(item)) { trk = (orte_dfs_tracker_t*)item; if (my_fd == trk->local_fd) { - /* stat the file and get its size */ + /* process the request */ + if (0 < orte_dfs_orted_num_worker_threads) { + wrkr = OBJ_NEW(worker_req_t); + wrkr->trk = trk; + wrkr->rid = rid; + ORTE_DFS_POST_WORKER(wrkr, remote_size); + return; + } + /* no worker threads, so stat the file and get its size */ if (0 > stat(trk->filename, &buf)) { /* cannot stat file */ opal_output_verbose(1, orte_dfs_base.output, @@ -1393,7 +1517,17 @@ static void recv_dfs_cmd(int status, orte_process_name_t* sender, item = opal_list_get_next(item)) { trk = (orte_dfs_tracker_t*)item; if (my_fd == trk->local_fd) { - /* stat the file and get its size */ + /* process the request */ + if (0 < orte_dfs_orted_num_worker_threads) { + wrkr = OBJ_NEW(worker_req_t); + wrkr->trk = trk; + wrkr->rid = rid; + wrkr->nbytes = i64; + wrkr->whence = whence; + ORTE_DFS_POST_WORKER(wrkr, remote_seek); + return; + } + /* no worker threads, so stat the file and get its size */ if (0 > stat(trk->filename, &buf)) { /* cannot stat file */ opal_output_verbose(1, orte_dfs_base.output, @@ -1481,10 +1615,6 @@ static void recv_dfs_cmd(int status, orte_process_name_t* sender, ORTE_ERROR_LOG(rc); goto answer_read; } - opal_output_verbose(1, orte_dfs_base.output, - "%s reading %ld bytes from local fd %d", - ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), - (long)i64, my_fd); /* find the corresponding tracker - we do this to ensure * that the local fd we were sent is actually open */ @@ -1493,20 +1623,31 @@ static void recv_dfs_cmd(int status, orte_process_name_t* sender, item = opal_list_get_next(item)) { trk = (orte_dfs_tracker_t*)item; if (my_fd == trk->local_fd) { - /* do the read */ - opal_output_verbose(1, orte_dfs_base.output, - "%s issuing read", - ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)); - read_buf = (uint8_t*)malloc(i64); - bytes_read = read(my_fd, read_buf, (long)i64); - if (0 < bytes_read) { - /* update our location */ - trk->location += bytes_read; + if (0 < orte_dfs_orted_num_worker_threads) { + wrkr = OBJ_NEW(worker_req_t); + wrkr->rid = rid; + wrkr->trk = trk; + wrkr->nbytes = i64; + /* dispatch to the currently indexed thread */ + ORTE_DFS_POST_WORKER(wrkr, remote_read); + return; + } else { + opal_output_verbose(1, orte_dfs_base.output, + "%s reading %ld bytes from local fd %d", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), + (long)i64, my_fd); + /* do the read */ + read_buf = (uint8_t*)malloc(i64); + bytes_read = read(my_fd, read_buf, (long)i64); + if (0 < bytes_read) { + /* update our location */ + trk->location += bytes_read; + } } break; } } - answer_read: + answer_read: /* construct the return message */ answer = OBJ_NEW(opal_buffer_t); if (OPAL_SUCCESS != (rc = opal_dss.pack(answer, &cmd, 1, ORTE_DFS_CMD_T))) { @@ -1972,3 +2113,216 @@ static void recv_dfs_data(int status, orte_process_name_t* sender, break; } } + +static void* worker_thread_engine(opal_object_t *obj) +{ + opal_thread_t *thread = (opal_thread_t*)obj; + worker_thread_t *ptr = (worker_thread_t*)thread->t_arg; + + while (ptr->active) { + opal_event_loop(ptr->event_base, OPAL_EVLOOP_ONCE); + } + return OPAL_THREAD_CANCELLED; +} + +static void remote_open(int fd, short args, void *cbdata) +{ + worker_req_t *req = (worker_req_t*)cbdata; + opal_buffer_t *answer; + orte_dfs_cmd_t cmd = ORTE_DFS_OPEN_CMD; + int rc; + + /* attempt to open the file */ + opal_output_verbose(1, orte_dfs_base.output, + "%s opening file %s", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), + req->trk->filename); + if (0 > (req->trk->local_fd = open(req->trk->filename, O_RDONLY))) { + ORTE_ERROR_LOG(ORTE_ERR_FILE_OPEN_FAILURE); + } + /* construct the return message */ + answer = OBJ_NEW(opal_buffer_t); + if (OPAL_SUCCESS != (rc = opal_dss.pack(answer, &cmd, 1, ORTE_DFS_CMD_T))) { + ORTE_ERROR_LOG(rc); + return; + } + if (OPAL_SUCCESS != (rc = opal_dss.pack(answer, &req->rid, 1, OPAL_UINT64))) { + ORTE_ERROR_LOG(rc); + return; + } + if (OPAL_SUCCESS != (rc = opal_dss.pack(answer, &req->trk->local_fd, 1, OPAL_INT))) { + ORTE_ERROR_LOG(rc); + return; + } + /* send it */ + if (0 > (rc = orte_rml.send_buffer_nb(&req->trk->requestor, answer, + ORTE_RML_TAG_DFS_DATA, 0, + orte_rml_send_callback, NULL))) { + ORTE_ERROR_LOG(rc); + OBJ_RELEASE(answer); + } +} + +static void remote_size(int fd, short args, void *cbdata) +{ + worker_req_t *req = (worker_req_t*)cbdata; + int rc; + struct stat buf; + int64_t i64; + opal_buffer_t *answer; + orte_dfs_cmd_t cmd = ORTE_DFS_SIZE_CMD; + + if (0 > stat(req->trk->filename, &buf)) { + /* cannot stat file */ + opal_output_verbose(1, orte_dfs_base.output, + "%s could not stat %s", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), + req->trk->filename); + } else { + i64 = buf.st_size; + } + /* construct the return message */ + answer = OBJ_NEW(opal_buffer_t); + if (OPAL_SUCCESS != (rc = opal_dss.pack(answer, &cmd, 1, ORTE_DFS_CMD_T))) { + ORTE_ERROR_LOG(rc); + return; + } + if (OPAL_SUCCESS != (rc = opal_dss.pack(answer, &req->rid, 1, OPAL_UINT64))) { + ORTE_ERROR_LOG(rc); + return; + } + if (OPAL_SUCCESS != (rc = opal_dss.pack(answer, &i64, 1, OPAL_INT64))) { + ORTE_ERROR_LOG(rc); + return; + } + /* send it */ + if (0 > (rc = orte_rml.send_buffer_nb(&req->trk->requestor, answer, + ORTE_RML_TAG_DFS_DATA, 0, + orte_rml_send_callback, NULL))) { + ORTE_ERROR_LOG(rc); + OBJ_RELEASE(answer); + } +} + +static void remote_seek(int fd, short args, void *cbdata) +{ + worker_req_t *req = (worker_req_t*)cbdata; + opal_buffer_t *answer; + orte_dfs_cmd_t cmd = ORTE_DFS_SEEK_CMD; + int rc; + struct stat buf; + int64_t i64; + + /* stat the file and get its size */ + if (0 > stat(req->trk->filename, &buf)) { + /* cannot stat file */ + opal_output_verbose(1, orte_dfs_base.output, + "%s seek could not stat %s", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), + req->trk->filename); + } else if (buf.st_size < req->nbytes && SEEK_SET == req->whence) { + /* seek would take us past EOF */ + opal_output_verbose(1, orte_dfs_base.output, + "%s seek SET past EOF on file %s", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), + req->trk->filename); + i64 = -2; + } else if (buf.st_size < (off_t)(req->trk->location + req->nbytes) && + SEEK_CUR == req->whence) { + /* seek would take us past EOF */ + opal_output_verbose(1, orte_dfs_base.output, + "%s seek CUR past EOF on file %s", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), + req->trk->filename); + i64 = -3; + } else { + lseek(req->trk->local_fd, req->nbytes, req->whence); + if (SEEK_SET == req->whence) { + req->trk->location = req->nbytes; + } else { + req->trk->location += req->nbytes; + } + i64 = req->nbytes; + } + + /* construct the return message */ + answer = OBJ_NEW(opal_buffer_t); + if (OPAL_SUCCESS != (rc = opal_dss.pack(answer, &cmd, 1, ORTE_DFS_CMD_T))) { + ORTE_ERROR_LOG(rc); + return; + } + if (OPAL_SUCCESS != (rc = opal_dss.pack(answer, &req->rid, 1, OPAL_UINT64))) { + ORTE_ERROR_LOG(rc); + return; + } + if (OPAL_SUCCESS != (rc = opal_dss.pack(answer, &i64, 1, OPAL_INT64))) { + ORTE_ERROR_LOG(rc); + return; + } + /* send it */ + if (0 > (rc = orte_rml.send_buffer_nb(&req->trk->requestor, answer, + ORTE_RML_TAG_DFS_DATA, 0, + orte_rml_send_callback, NULL))) { + ORTE_ERROR_LOG(rc); + OBJ_RELEASE(answer); + } +} + +static void remote_read(int fd, short args, void *cbdata) +{ + worker_req_t *req = (worker_req_t*)cbdata; + uint8_t *read_buf; + opal_buffer_t *answer; + orte_dfs_cmd_t cmd = ORTE_DFS_READ_CMD; + int64_t bytes_read; + int rc; + + /* do the read */ + opal_output_verbose(1, orte_dfs_base.output, + "%s issuing read", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)); + read_buf = (uint8_t*)malloc(req->nbytes); + bytes_read = read(req->trk->local_fd, read_buf, (long)req->nbytes); + if (0 < bytes_read) { + /* update our location */ + req->trk->location += bytes_read; + } + /* construct the return message */ + answer = OBJ_NEW(opal_buffer_t); + if (OPAL_SUCCESS != (rc = opal_dss.pack(answer, &cmd, 1, ORTE_DFS_CMD_T))) { + ORTE_ERROR_LOG(rc); + return; + } + if (OPAL_SUCCESS != (rc = opal_dss.pack(answer, &req->rid, 1, OPAL_UINT64))) { + ORTE_ERROR_LOG(rc); + return; + } + /* include the number of bytes read */ + if (OPAL_SUCCESS != (rc = opal_dss.pack(answer, &bytes_read, 1, OPAL_INT64))) { + ORTE_ERROR_LOG(rc); + return; + } + /* include the bytes read */ + if (0 < bytes_read) { + if (OPAL_SUCCESS != (rc = opal_dss.pack(answer, read_buf, bytes_read, OPAL_UINT8))) { + ORTE_ERROR_LOG(rc); + return; + } + } + /* send it */ + opal_output_verbose(1, orte_dfs_base.output, + "%s sending %ld bytes back to %s", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), + (long)bytes_read, + ORTE_NAME_PRINT(&req->trk->requestor)); + if (0 > (rc = orte_rml.send_buffer_nb(&req->trk->requestor, answer, + ORTE_RML_TAG_DFS_DATA, 0, + orte_rml_send_callback, NULL))) { + ORTE_ERROR_LOG(rc); + OBJ_RELEASE(answer); + return; + } + free(read_buf); + OBJ_RELEASE(req); +} + diff --git a/orte/mca/dfs/orted/dfs_orted.h b/orte/mca/dfs/orted/dfs_orted.h index ff1e5f2a19..f90e87b14e 100644 --- a/orte/mca/dfs/orted/dfs_orted.h +++ b/orte/mca/dfs/orted/dfs_orted.h @@ -31,6 +31,8 @@ ORTE_MODULE_DECLSPEC extern orte_dfs_base_component_t mca_dfs_orted_component; ORTE_DECLSPEC extern orte_dfs_base_module_t orte_dfs_orted_module; +extern int orte_dfs_orted_num_worker_threads; + END_C_DECLS #endif /* MCA_dfs_orted_EXPORT_H */ diff --git a/orte/mca/dfs/orted/dfs_orted_component.c b/orte/mca/dfs/orted/dfs_orted_component.c index 722b525864..b8c1c0d898 100644 --- a/orte/mca/dfs/orted/dfs_orted_component.c +++ b/orte/mca/dfs/orted/dfs_orted_component.c @@ -24,6 +24,8 @@ const char *orte_dfs_orted_component_version_string = "ORTE DFS orted MCA component version " ORTE_VERSION; +int orte_dfs_orted_num_worker_threads = 0; + /* * Local functionality */ @@ -61,6 +63,11 @@ orte_dfs_base_component_t mca_dfs_orted_component = static int dfs_orted_open(void) { + mca_base_component_t *c = &mca_dfs_orted_component.base_version; + + mca_base_param_reg_int(c, "num_worker_threads", + "Number of worker threads to use for processing file requests", + false, false, 0, &orte_dfs_orted_num_worker_threads); return ORTE_SUCCESS; }