1
1

Enable multiple worker threads for processing DFS requests

This commit was SVN r27659.
Этот коммит содержится в:
Ralph Castain 2012-12-09 02:54:19 +00:00
родитель c26ed7dcdd
Коммит 1e92aa2b66
3 изменённых файлов: 388 добавлений и 25 удалений

Просмотреть файл

@ -97,7 +97,85 @@ orte_dfs_base_module_t orte_dfs_orted_module = {
dfs_purge_file_maps
};
static void finalize_thread(int fd, short args, void *cbdata)
{
/* nothing to do here - we just need it to
* kick us out of the event_loop
*/
}
static void* worker_thread_engine(opal_object_t *obj);
typedef struct {
opal_object_t super;
int idx;
opal_event_base_t *event_base;
opal_event_t fin_ev;
bool active;
opal_thread_t thread;
} worker_thread_t;
static void wt_const(worker_thread_t *ptr)
{
/* create an event base for this thread */
ptr->event_base = opal_event_base_create();
/* setup an event to finalize it */
opal_event_set(ptr->event_base, &ptr->fin_ev, -1, OPAL_EV_WRITE, finalize_thread, NULL);
/* construct the thread object */
OBJ_CONSTRUCT(&ptr->thread, opal_thread_t);
/* fork off a thread to progress it */
ptr->active = true;
ptr->thread.t_run = worker_thread_engine;
ptr->thread.t_arg = ptr;
opal_thread_start(&ptr->thread);
}
static void wt_dest(worker_thread_t *ptr)
{
/* stop the thread */
ptr->active = false;
/* trigger the finalize event */
opal_event_active(&ptr->fin_ev, OPAL_EV_WRITE, 1);
/* wait for thread to exit */
opal_thread_join(&ptr->thread, NULL);
OBJ_DESTRUCT(&ptr->thread);
/* release the event base */
opal_event_base_free(ptr->event_base);
}
OBJ_CLASS_INSTANCE(worker_thread_t,
opal_object_t,
wt_const, wt_dest);
typedef struct {
opal_object_t super;
opal_event_t ev;
uint64_t rid;
orte_dfs_tracker_t *trk;
int64_t nbytes;
int whence;
} worker_req_t;
OBJ_CLASS_INSTANCE(worker_req_t,
opal_object_t,
NULL, NULL);
#define ORTE_DFS_POST_WORKER(r, cb) \
do { \
worker_thread_t *wt; \
wt = (worker_thread_t*)opal_pointer_array_get_item(&worker_threads, wt_cntr); \
opal_output_verbose(1, orte_dfs_base.output, \
"%s assigning req to worker thread %d", \
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), \
wt->idx); \
opal_event_set(wt->event_base, &((r)->ev), \
-1, OPAL_EV_WRITE, (cb), (r)); \
opal_event_active(&((r)->ev), OPAL_EV_WRITE, 1); \
/* move to the next thread */ \
wt_cntr++; \
if (wt_cntr == orte_dfs_orted_num_worker_threads) { \
wt_cntr = 0; \
} \
} while(0);
static opal_list_t requests, active_files, file_maps;
static opal_pointer_array_t worker_threads;
static int wt_cntr = 0;
static int local_fd = 0;
static uint64_t req_id = 0;
static void recv_dfs_cmd(int status, orte_process_name_t* sender,
@ -106,20 +184,29 @@ static void recv_dfs_cmd(int status, orte_process_name_t* sender,
static void recv_dfs_data(int status, orte_process_name_t* sender,
opal_buffer_t* buffer, orte_rml_tag_t tag,
void* cbdata);
static void remote_read(int fd, short args, void *cbata);
static void remote_open(int fd, short args, void *cbdata);
static void remote_size(int fd, short args, void *cbdata);
static void remote_seek(int fd, short args, void *cbdata);
static int init(void)
{
int rc;
int i;
worker_thread_t *wt;
OBJ_CONSTRUCT(&requests, opal_list_t);
OBJ_CONSTRUCT(&active_files, opal_list_t);
OBJ_CONSTRUCT(&file_maps, opal_list_t);
OBJ_CONSTRUCT(&worker_threads, opal_pointer_array_t);
opal_pointer_array_init(&worker_threads, 1, INT_MAX, 1);
if (ORTE_SUCCESS != (rc = orte_rml.recv_buffer_nb(ORTE_NAME_WILDCARD,
ORTE_RML_TAG_DFS_CMD,
ORTE_RML_PERSISTENT,
recv_dfs_cmd,
NULL))) {
ORTE_ERROR_LOG(rc);
return rc;
}
if (ORTE_SUCCESS != (rc = orte_rml.recv_buffer_nb(ORTE_NAME_WILDCARD,
ORTE_RML_TAG_DFS_DATA,
@ -127,13 +214,27 @@ static int init(void)
recv_dfs_data,
NULL))) {
ORTE_ERROR_LOG(rc);
return rc;
}
opal_output_verbose(1, orte_dfs_base.output,
"%s starting %d worker threads",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
orte_dfs_orted_num_worker_threads);
for (i=0; i < orte_dfs_orted_num_worker_threads; i++) {
wt = OBJ_NEW(worker_thread_t);
wt->idx = i;
opal_pointer_array_add(&worker_threads, wt);
}
return rc;
}
static int finalize(void)
{
opal_list_item_t *item;
int i;
worker_thread_t *wt;
orte_rml.recv_cancel(ORTE_NAME_WILDCARD, ORTE_RML_TAG_DFS_CMD);
orte_rml.recv_cancel(ORTE_NAME_WILDCARD, ORTE_RML_TAG_DFS_DATA);
@ -149,6 +250,12 @@ static int finalize(void)
OBJ_RELEASE(item);
}
OBJ_DESTRUCT(&file_maps);
for (i=0; i < worker_threads.size; i++) {
if (NULL != (wt = (worker_thread_t*)opal_pointer_array_get_item(&worker_threads, i))) {
OBJ_RELEASE(wt);
}
}
OBJ_DESTRUCT(&worker_threads);
return ORTE_SUCCESS;
}
@ -1208,6 +1315,7 @@ static void recv_dfs_cmd(int status, orte_process_name_t* sender,
int i, j;
orte_vpid_t vpid;
int32_t nentries, ncontributors;
worker_req_t *wrkr;
/* unpack the command */
cnt = 1;
@ -1235,7 +1343,23 @@ static void recv_dfs_cmd(int status, orte_process_name_t* sender,
ORTE_ERROR_LOG(rc);
return;
}
/* attempt to open the file */
/* create a tracker for this file */
trk = OBJ_NEW(orte_dfs_tracker_t);
trk->requestor.jobid = sender->jobid;
trk->requestor.vpid = sender->vpid;
trk->host_daemon.jobid = ORTE_PROC_MY_NAME->jobid;
trk->host_daemon.vpid = ORTE_PROC_MY_NAME->vpid;
trk->filename = strdup(filename);
opal_list_append(&active_files, &trk->super);
/* process the request */
if (0 < orte_dfs_orted_num_worker_threads) {
wrkr = OBJ_NEW(worker_req_t);
wrkr->trk = trk;
wrkr->rid = rid;
ORTE_DFS_POST_WORKER(wrkr, remote_open);
return;
}
/* no worker threads, so attempt to open the file */
opal_output_verbose(1, orte_dfs_base.output,
"%s opening file %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
@ -1244,15 +1368,7 @@ static void recv_dfs_cmd(int status, orte_process_name_t* sender,
ORTE_ERROR_LOG(ORTE_ERR_FILE_OPEN_FAILURE);
goto answer_open;
}
/* create a tracker for this file */
trk = OBJ_NEW(orte_dfs_tracker_t);
trk->requestor.jobid = sender->jobid;
trk->requestor.vpid = sender->vpid;
trk->host_daemon.jobid = ORTE_PROC_MY_NAME->jobid;
trk->host_daemon.vpid = ORTE_PROC_MY_NAME->vpid;
trk->filename = strdup(filename);
trk->local_fd = my_fd;
opal_list_append(&active_files, &trk->super);
answer_open:
/* construct the return message */
answer = OBJ_NEW(opal_buffer_t);
@ -1321,7 +1437,15 @@ static void recv_dfs_cmd(int status, orte_process_name_t* sender,
item = opal_list_get_next(item)) {
trk = (orte_dfs_tracker_t*)item;
if (my_fd == trk->local_fd) {
/* stat the file and get its size */
/* process the request */
if (0 < orte_dfs_orted_num_worker_threads) {
wrkr = OBJ_NEW(worker_req_t);
wrkr->trk = trk;
wrkr->rid = rid;
ORTE_DFS_POST_WORKER(wrkr, remote_size);
return;
}
/* no worker threads, so stat the file and get its size */
if (0 > stat(trk->filename, &buf)) {
/* cannot stat file */
opal_output_verbose(1, orte_dfs_base.output,
@ -1393,7 +1517,17 @@ static void recv_dfs_cmd(int status, orte_process_name_t* sender,
item = opal_list_get_next(item)) {
trk = (orte_dfs_tracker_t*)item;
if (my_fd == trk->local_fd) {
/* stat the file and get its size */
/* process the request */
if (0 < orte_dfs_orted_num_worker_threads) {
wrkr = OBJ_NEW(worker_req_t);
wrkr->trk = trk;
wrkr->rid = rid;
wrkr->nbytes = i64;
wrkr->whence = whence;
ORTE_DFS_POST_WORKER(wrkr, remote_seek);
return;
}
/* no worker threads, so stat the file and get its size */
if (0 > stat(trk->filename, &buf)) {
/* cannot stat file */
opal_output_verbose(1, orte_dfs_base.output,
@ -1481,10 +1615,6 @@ static void recv_dfs_cmd(int status, orte_process_name_t* sender,
ORTE_ERROR_LOG(rc);
goto answer_read;
}
opal_output_verbose(1, orte_dfs_base.output,
"%s reading %ld bytes from local fd %d",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
(long)i64, my_fd);
/* find the corresponding tracker - we do this to ensure
* that the local fd we were sent is actually open
*/
@ -1493,20 +1623,31 @@ static void recv_dfs_cmd(int status, orte_process_name_t* sender,
item = opal_list_get_next(item)) {
trk = (orte_dfs_tracker_t*)item;
if (my_fd == trk->local_fd) {
/* do the read */
opal_output_verbose(1, orte_dfs_base.output,
"%s issuing read",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME));
read_buf = (uint8_t*)malloc(i64);
bytes_read = read(my_fd, read_buf, (long)i64);
if (0 < bytes_read) {
/* update our location */
trk->location += bytes_read;
if (0 < orte_dfs_orted_num_worker_threads) {
wrkr = OBJ_NEW(worker_req_t);
wrkr->rid = rid;
wrkr->trk = trk;
wrkr->nbytes = i64;
/* dispatch to the currently indexed thread */
ORTE_DFS_POST_WORKER(wrkr, remote_read);
return;
} else {
opal_output_verbose(1, orte_dfs_base.output,
"%s reading %ld bytes from local fd %d",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
(long)i64, my_fd);
/* do the read */
read_buf = (uint8_t*)malloc(i64);
bytes_read = read(my_fd, read_buf, (long)i64);
if (0 < bytes_read) {
/* update our location */
trk->location += bytes_read;
}
}
break;
}
}
answer_read:
answer_read:
/* construct the return message */
answer = OBJ_NEW(opal_buffer_t);
if (OPAL_SUCCESS != (rc = opal_dss.pack(answer, &cmd, 1, ORTE_DFS_CMD_T))) {
@ -1972,3 +2113,216 @@ static void recv_dfs_data(int status, orte_process_name_t* sender,
break;
}
}
static void* worker_thread_engine(opal_object_t *obj)
{
opal_thread_t *thread = (opal_thread_t*)obj;
worker_thread_t *ptr = (worker_thread_t*)thread->t_arg;
while (ptr->active) {
opal_event_loop(ptr->event_base, OPAL_EVLOOP_ONCE);
}
return OPAL_THREAD_CANCELLED;
}
static void remote_open(int fd, short args, void *cbdata)
{
worker_req_t *req = (worker_req_t*)cbdata;
opal_buffer_t *answer;
orte_dfs_cmd_t cmd = ORTE_DFS_OPEN_CMD;
int rc;
/* attempt to open the file */
opal_output_verbose(1, orte_dfs_base.output,
"%s opening file %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
req->trk->filename);
if (0 > (req->trk->local_fd = open(req->trk->filename, O_RDONLY))) {
ORTE_ERROR_LOG(ORTE_ERR_FILE_OPEN_FAILURE);
}
/* construct the return message */
answer = OBJ_NEW(opal_buffer_t);
if (OPAL_SUCCESS != (rc = opal_dss.pack(answer, &cmd, 1, ORTE_DFS_CMD_T))) {
ORTE_ERROR_LOG(rc);
return;
}
if (OPAL_SUCCESS != (rc = opal_dss.pack(answer, &req->rid, 1, OPAL_UINT64))) {
ORTE_ERROR_LOG(rc);
return;
}
if (OPAL_SUCCESS != (rc = opal_dss.pack(answer, &req->trk->local_fd, 1, OPAL_INT))) {
ORTE_ERROR_LOG(rc);
return;
}
/* send it */
if (0 > (rc = orte_rml.send_buffer_nb(&req->trk->requestor, answer,
ORTE_RML_TAG_DFS_DATA, 0,
orte_rml_send_callback, NULL))) {
ORTE_ERROR_LOG(rc);
OBJ_RELEASE(answer);
}
}
static void remote_size(int fd, short args, void *cbdata)
{
worker_req_t *req = (worker_req_t*)cbdata;
int rc;
struct stat buf;
int64_t i64;
opal_buffer_t *answer;
orte_dfs_cmd_t cmd = ORTE_DFS_SIZE_CMD;
if (0 > stat(req->trk->filename, &buf)) {
/* cannot stat file */
opal_output_verbose(1, orte_dfs_base.output,
"%s could not stat %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
req->trk->filename);
} else {
i64 = buf.st_size;
}
/* construct the return message */
answer = OBJ_NEW(opal_buffer_t);
if (OPAL_SUCCESS != (rc = opal_dss.pack(answer, &cmd, 1, ORTE_DFS_CMD_T))) {
ORTE_ERROR_LOG(rc);
return;
}
if (OPAL_SUCCESS != (rc = opal_dss.pack(answer, &req->rid, 1, OPAL_UINT64))) {
ORTE_ERROR_LOG(rc);
return;
}
if (OPAL_SUCCESS != (rc = opal_dss.pack(answer, &i64, 1, OPAL_INT64))) {
ORTE_ERROR_LOG(rc);
return;
}
/* send it */
if (0 > (rc = orte_rml.send_buffer_nb(&req->trk->requestor, answer,
ORTE_RML_TAG_DFS_DATA, 0,
orte_rml_send_callback, NULL))) {
ORTE_ERROR_LOG(rc);
OBJ_RELEASE(answer);
}
}
static void remote_seek(int fd, short args, void *cbdata)
{
worker_req_t *req = (worker_req_t*)cbdata;
opal_buffer_t *answer;
orte_dfs_cmd_t cmd = ORTE_DFS_SEEK_CMD;
int rc;
struct stat buf;
int64_t i64;
/* stat the file and get its size */
if (0 > stat(req->trk->filename, &buf)) {
/* cannot stat file */
opal_output_verbose(1, orte_dfs_base.output,
"%s seek could not stat %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
req->trk->filename);
} else if (buf.st_size < req->nbytes && SEEK_SET == req->whence) {
/* seek would take us past EOF */
opal_output_verbose(1, orte_dfs_base.output,
"%s seek SET past EOF on file %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
req->trk->filename);
i64 = -2;
} else if (buf.st_size < (off_t)(req->trk->location + req->nbytes) &&
SEEK_CUR == req->whence) {
/* seek would take us past EOF */
opal_output_verbose(1, orte_dfs_base.output,
"%s seek CUR past EOF on file %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
req->trk->filename);
i64 = -3;
} else {
lseek(req->trk->local_fd, req->nbytes, req->whence);
if (SEEK_SET == req->whence) {
req->trk->location = req->nbytes;
} else {
req->trk->location += req->nbytes;
}
i64 = req->nbytes;
}
/* construct the return message */
answer = OBJ_NEW(opal_buffer_t);
if (OPAL_SUCCESS != (rc = opal_dss.pack(answer, &cmd, 1, ORTE_DFS_CMD_T))) {
ORTE_ERROR_LOG(rc);
return;
}
if (OPAL_SUCCESS != (rc = opal_dss.pack(answer, &req->rid, 1, OPAL_UINT64))) {
ORTE_ERROR_LOG(rc);
return;
}
if (OPAL_SUCCESS != (rc = opal_dss.pack(answer, &i64, 1, OPAL_INT64))) {
ORTE_ERROR_LOG(rc);
return;
}
/* send it */
if (0 > (rc = orte_rml.send_buffer_nb(&req->trk->requestor, answer,
ORTE_RML_TAG_DFS_DATA, 0,
orte_rml_send_callback, NULL))) {
ORTE_ERROR_LOG(rc);
OBJ_RELEASE(answer);
}
}
static void remote_read(int fd, short args, void *cbdata)
{
worker_req_t *req = (worker_req_t*)cbdata;
uint8_t *read_buf;
opal_buffer_t *answer;
orte_dfs_cmd_t cmd = ORTE_DFS_READ_CMD;
int64_t bytes_read;
int rc;
/* do the read */
opal_output_verbose(1, orte_dfs_base.output,
"%s issuing read",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME));
read_buf = (uint8_t*)malloc(req->nbytes);
bytes_read = read(req->trk->local_fd, read_buf, (long)req->nbytes);
if (0 < bytes_read) {
/* update our location */
req->trk->location += bytes_read;
}
/* construct the return message */
answer = OBJ_NEW(opal_buffer_t);
if (OPAL_SUCCESS != (rc = opal_dss.pack(answer, &cmd, 1, ORTE_DFS_CMD_T))) {
ORTE_ERROR_LOG(rc);
return;
}
if (OPAL_SUCCESS != (rc = opal_dss.pack(answer, &req->rid, 1, OPAL_UINT64))) {
ORTE_ERROR_LOG(rc);
return;
}
/* include the number of bytes read */
if (OPAL_SUCCESS != (rc = opal_dss.pack(answer, &bytes_read, 1, OPAL_INT64))) {
ORTE_ERROR_LOG(rc);
return;
}
/* include the bytes read */
if (0 < bytes_read) {
if (OPAL_SUCCESS != (rc = opal_dss.pack(answer, read_buf, bytes_read, OPAL_UINT8))) {
ORTE_ERROR_LOG(rc);
return;
}
}
/* send it */
opal_output_verbose(1, orte_dfs_base.output,
"%s sending %ld bytes back to %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
(long)bytes_read,
ORTE_NAME_PRINT(&req->trk->requestor));
if (0 > (rc = orte_rml.send_buffer_nb(&req->trk->requestor, answer,
ORTE_RML_TAG_DFS_DATA, 0,
orte_rml_send_callback, NULL))) {
ORTE_ERROR_LOG(rc);
OBJ_RELEASE(answer);
return;
}
free(read_buf);
OBJ_RELEASE(req);
}

Просмотреть файл

@ -31,6 +31,8 @@ ORTE_MODULE_DECLSPEC extern orte_dfs_base_component_t mca_dfs_orted_component;
ORTE_DECLSPEC extern orte_dfs_base_module_t orte_dfs_orted_module;
extern int orte_dfs_orted_num_worker_threads;
END_C_DECLS
#endif /* MCA_dfs_orted_EXPORT_H */

Просмотреть файл

@ -24,6 +24,8 @@
const char *orte_dfs_orted_component_version_string =
"ORTE DFS orted MCA component version " ORTE_VERSION;
int orte_dfs_orted_num_worker_threads = 0;
/*
* Local functionality
*/
@ -61,6 +63,11 @@ orte_dfs_base_component_t mca_dfs_orted_component =
static int dfs_orted_open(void)
{
mca_base_component_t *c = &mca_dfs_orted_component.base_version;
mca_base_param_reg_int(c, "num_worker_threads",
"Number of worker threads to use for processing file requests",
false, false, 0, &orte_dfs_orted_num_worker_threads);
return ORTE_SUCCESS;
}