1
1
http://www.open-mpi.org/community/lists/devel/2014/01/13789.php

add support for async modex when requested.

cmr=v1.7.5:reviewer=jsquyres:subject=Add async modex support

This commit was SVN r30565.
Этот коммит содержится в:
Ralph Castain 2014-02-05 14:39:27 +00:00
родитель b7d10b3499
Коммит 1326ed704f
8 изменённых файлов: 129 добавлений и 9 удалений

Просмотреть файл

@ -139,6 +139,11 @@ setup_mpool_base_resources(mca_btl_sm_component_t *comp_ptr,
int fd = -1; int fd = -1;
ssize_t bread = 0; ssize_t bread = 0;
/* Wait for the file to be created */
while (0 != access(comp_ptr->sm_rndv_file_name, R_OK)) {
opal_progress();
}
if (-1 == (fd = open(comp_ptr->sm_mpool_rndv_file_name, O_RDONLY))) { if (-1 == (fd = open(comp_ptr->sm_mpool_rndv_file_name, O_RDONLY))) {
int err = errno; int err = errno;
opal_show_help("help-mpi-btl-sm.txt", "sys call fail", true, opal_show_help("help-mpi-btl-sm.txt", "sys call fail", true,
@ -188,6 +193,7 @@ sm_segment_attach(mca_btl_sm_component_t *comp_ptr)
opal_show_help("help-mpi-btl-sm.txt", "sys call fail", true, opal_show_help("help-mpi-btl-sm.txt", "sys call fail", true,
"open(2)", strerror(err), err); "open(2)", strerror(err), err);
rc = OMPI_ERR_IN_ERRNO; rc = OMPI_ERR_IN_ERRNO;
exit(1);
goto out; goto out;
} }
if ((ssize_t)sizeof(opal_shmem_ds_t) != (bread = if ((ssize_t)sizeof(opal_shmem_ds_t) != (bread =

Просмотреть файл

@ -1,5 +1,6 @@
/* /*
* Copyright (c) 2012 Los Alamos National Security, LLC. All rights reserved. * Copyright (c) 2012 Los Alamos National Security, LLC. All rights reserved.
* Copyright (c) 2014 Intel, Inc. All rights reserved.
* *
* $COPYRIGHT$ * $COPYRIGHT$
* *

Просмотреть файл

@ -26,6 +26,7 @@
#include "orte/mca/rmaps/rmaps_types.h" #include "orte/mca/rmaps/rmaps_types.h"
#include "orte/mca/rmaps/base/base.h" #include "orte/mca/rmaps/base/base.h"
#include "orte/mca/rml/base/rml_contact.h" #include "orte/mca/rml/base/rml_contact.h"
#include "orte/mca/state/state.h"
#include "orte/mca/routed/routed.h" #include "orte/mca/routed/routed.h"
#include "orte/util/name_fns.h" #include "orte/util/name_fns.h"
#include "orte/util/session_dir.h" #include "orte/util/session_dir.h"
@ -140,6 +141,9 @@ void ompi_rte_wait_for_debugger(void)
int ompi_rte_modex(ompi_rte_collective_t *coll) int ompi_rte_modex(ompi_rte_collective_t *coll)
{ {
/* mark that this process reached modex */
orte_grpcomm_base.modex_ready = true;
if ((orte_process_info.num_procs < ompi_hostname_cutoff) || if ((orte_process_info.num_procs < ompi_hostname_cutoff) ||
!ompi_rte_orte_direct_modex || !ompi_rte_orte_direct_modex ||
orte_standalone_operation) { orte_standalone_operation) {
@ -158,15 +162,27 @@ int ompi_rte_modex(ompi_rte_collective_t *coll)
* info we need, and we will retrieve the MPI-level info * info we need, and we will retrieve the MPI-level info
* only as requested. This will provide a faster startup * only as requested. This will provide a faster startup
* time since we won't do a massive allgather operation, * time since we won't do a massive allgather operation,
* but will make first-message connections slower. However, * but will make first-message connections slower.
* we still have to do a barrier op here to ensure that all
* procs have had time to store their modex info prior to
* receiving a request to provide it!
*/ */
OPAL_OUTPUT_VERBOSE((2, orte_grpcomm_base_framework.framework_output,
"%s using direct modex",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
/* process any pending requests */
ORTE_ACTIVATE_PROC_STATE(ORTE_PROC_MY_NAME, ORTE_PROC_STATE_MODEX_READY);
/* release the barrier */
if (NULL != coll->cbfunc) {
OPAL_OUTPUT_VERBOSE((2, orte_grpcomm_base_framework.framework_output, OPAL_OUTPUT_VERBOSE((2, orte_grpcomm_base_framework.framework_output,
"%s using direct modex - executing barrier", "%s CALLING MODEX RELEASE",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME))); ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
return orte_grpcomm.barrier(coll); coll->cbfunc(NULL, coll->cbdata);
} else {
OPAL_OUTPUT_VERBOSE((2, orte_grpcomm_base_framework.framework_output,
"%s NO MODEX RELEASE CBFUNC",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
}
/* flag the collective as complete */
coll->active = false;
return OMPI_SUCCESS;
} }
int ompi_rte_db_store(const orte_process_name_t *nm, const char* key, int ompi_rte_db_store(const orte_process_name_t *nm, const char* key,
@ -224,6 +240,10 @@ int ompi_rte_db_fetch(const struct ompi_proc_t *proc,
int rc; int rc;
if (OPAL_SUCCESS != (rc = opal_db.fetch((opal_identifier_t*)(&proc->proc_name), key, data, type))) { if (OPAL_SUCCESS != (rc = opal_db.fetch((opal_identifier_t*)(&proc->proc_name), key, data, type))) {
OPAL_OUTPUT_VERBOSE((2, orte_grpcomm_base_framework.framework_output,
"%s requesting direct modex from %s for %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(&proc->proc_name), key));
/* if we couldn't fetch the data via the db, then we will attempt /* if we couldn't fetch the data via the db, then we will attempt
* to retrieve it from the target proc * to retrieve it from the target proc
*/ */
@ -254,6 +274,10 @@ int ompi_rte_db_fetch_pointer(const struct ompi_proc_t *proc,
/* if we couldn't fetch the data via the db, then we will attempt /* if we couldn't fetch the data via the db, then we will attempt
* to retrieve it from the target proc * to retrieve it from the target proc
*/ */
OPAL_OUTPUT_VERBOSE((2, orte_grpcomm_base_framework.framework_output,
"%s requesting direct modex from %s for %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(&proc->proc_name), key));
if (ORTE_SUCCESS != (rc = direct_modex((orte_process_name_t*)&proc->proc_name, OPAL_SCOPE_PEER))) { if (ORTE_SUCCESS != (rc = direct_modex((orte_process_name_t*)&proc->proc_name, OPAL_SCOPE_PEER))) {
ORTE_ERROR_LOG(rc); ORTE_ERROR_LOG(rc);
return rc; return rc;
@ -283,6 +307,10 @@ int ompi_rte_db_fetch_multiple(const struct ompi_proc_t *proc,
/* if we couldn't fetch the data via the db, then we will attempt /* if we couldn't fetch the data via the db, then we will attempt
* to retrieve it from the target proc * to retrieve it from the target proc
*/ */
OPAL_OUTPUT_VERBOSE((2, orte_grpcomm_base_framework.framework_output,
"%s requesting direct modex from %s for %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(&proc->proc_name), key));
if (ORTE_SUCCESS != (rc = direct_modex((orte_process_name_t*)&proc->proc_name, OPAL_SCOPE_GLOBAL))) { if (ORTE_SUCCESS != (rc = direct_modex((orte_process_name_t*)&proc->proc_name, OPAL_SCOPE_GLOBAL))) {
ORTE_ERROR_LOG(rc); ORTE_ERROR_LOG(rc);
return rc; return rc;

Просмотреть файл

@ -11,7 +11,7 @@
* All rights reserved. * All rights reserved.
* Copyright (c) 2011-2013 Los Alamos National Security, LLC. * Copyright (c) 2011-2013 Los Alamos National Security, LLC.
* All rights reserved. * All rights reserved.
* Copyright (c) 2013 Intel, Inc. All rights reserved. * Copyright (c) 2013-2014 Intel, Inc. All rights reserved.
* $COPYRIGHT$ * $COPYRIGHT$
* *
* Additional copyrights may follow * Additional copyrights may follow
@ -62,8 +62,17 @@ typedef struct {
#if OPAL_HAVE_HWLOC #if OPAL_HAVE_HWLOC
hwloc_cpuset_t working_cpuset; hwloc_cpuset_t working_cpuset;
#endif #endif
bool modex_ready;
opal_list_t modex_requests;
} orte_grpcomm_base_t; } orte_grpcomm_base_t;
typedef struct {
opal_list_item_t super;
orte_process_name_t peer;
opal_scope_t scope;
} orte_grpcomm_modex_req_t;
OBJ_CLASS_DECLARATION(orte_grpcomm_modex_req_t);
typedef struct { typedef struct {
opal_object_t super; opal_object_t super;
opal_event_t ev; opal_event_t ev;
@ -114,6 +123,7 @@ ORTE_DECLSPEC int orte_grpcomm_base_pack_xcast(orte_jobid_t job,
opal_buffer_t *buffer, opal_buffer_t *buffer,
opal_buffer_t *message, opal_buffer_t *message,
orte_rml_tag_t tag); orte_rml_tag_t tag);
ORTE_DECLSPEC void orte_grpcomm_base_process_modex(int fd, short args, void *cbdata);
END_C_DECLS END_C_DECLS
#endif #endif

Просмотреть файл

@ -11,6 +11,7 @@
* All rights reserved. * All rights reserved.
* Copyright (c) 2011-2013 Los Alamos National Security, LLC. * Copyright (c) 2011-2013 Los Alamos National Security, LLC.
* All rights reserved. * All rights reserved.
* Copyright (c) 2014 Intel, Inc. All rights reserved.
* $COPYRIGHT$ * $COPYRIGHT$
* *
* Additional copyrights may follow * Additional copyrights may follow
@ -26,6 +27,8 @@
#include "opal/util/output.h" #include "opal/util/output.h"
#include "opal/mca/base/base.h" #include "opal/mca/base/base.h"
#include "orte/mca/state/state.h"
#include "orte/mca/grpcomm/base/base.h" #include "orte/mca/grpcomm/base/base.h"
@ -50,6 +53,8 @@ static int orte_grpcomm_base_close(void)
if( NULL != orte_grpcomm.finalize ) { if( NULL != orte_grpcomm.finalize ) {
orte_grpcomm.finalize(); orte_grpcomm.finalize();
} }
OBJ_DESTRUCT(&orte_grpcomm_base.active_colls);
OBJ_DESTRUCT(&orte_grpcomm_base.modex_requests);
#if OPAL_HAVE_HWLOC #if OPAL_HAVE_HWLOC
if (NULL != orte_grpcomm_base.working_cpuset) { if (NULL != orte_grpcomm_base.working_cpuset) {
@ -69,11 +74,18 @@ static int orte_grpcomm_base_open(mca_base_open_flag_t flags)
/* init globals */ /* init globals */
OBJ_CONSTRUCT(&orte_grpcomm_base.active_colls, opal_list_t); OBJ_CONSTRUCT(&orte_grpcomm_base.active_colls, opal_list_t);
orte_grpcomm_base.coll_id = 0; orte_grpcomm_base.coll_id = 0;
OBJ_CONSTRUCT(&orte_grpcomm_base.modex_requests, opal_list_t);
orte_grpcomm_base.modex_ready = false;
#if OPAL_HAVE_HWLOC #if OPAL_HAVE_HWLOC
orte_grpcomm_base.working_cpuset = NULL; orte_grpcomm_base.working_cpuset = NULL;
#endif #endif
/* register the modex processing event */
if (ORTE_PROC_IS_APP) {
orte_state.add_proc_state(ORTE_PROC_STATE_MODEX_READY, orte_grpcomm_base_process_modex, ORTE_MSG_PRI);
}
return mca_base_framework_components_open(&orte_grpcomm_base_framework, flags); return mca_base_framework_components_open(&orte_grpcomm_base_framework, flags);
} }
@ -146,3 +158,7 @@ OBJ_CLASS_INSTANCE(orte_grpcomm_collective_t,
OBJ_CLASS_INSTANCE(orte_grpcomm_caddy_t, OBJ_CLASS_INSTANCE(orte_grpcomm_caddy_t,
opal_object_t, opal_object_t,
NULL, NULL); NULL, NULL);
OBJ_CLASS_INSTANCE(orte_grpcomm_modex_req_t,
opal_list_item_t,
NULL, NULL);

Просмотреть файл

@ -14,7 +14,7 @@
* Copyright (c) 2011 Cisco Systems, Inc. All rights reserved. * Copyright (c) 2011 Cisco Systems, Inc. All rights reserved.
* Copyright (c) 2011-2013 Los Alamos National Security, LLC. * Copyright (c) 2011-2013 Los Alamos National Security, LLC.
* All rights reserved. * All rights reserved.
* Copyright (c) 2013 Intel, Inc. All rights reserved. * Copyright (c) 2013-2014 Intel, Inc. All rights reserved.
* $COPYRIGHT$ * $COPYRIGHT$
* *
* Additional copyrights may follow * Additional copyrights may follow

Просмотреть файл

@ -315,6 +315,42 @@ static void app_recv(int status, orte_process_name_t* sender,
} }
} }
void orte_grpcomm_base_process_modex(int fd, short args, void *cbdata)
{
orte_grpcomm_modex_req_t *req;
opal_buffer_t *buf;
int rc;
OPAL_LIST_FOREACH(req, &orte_grpcomm_base.modex_requests, orte_grpcomm_modex_req_t) {
/* we always must send a response, even if nothing could be
* returned, to prevent the remote proc from hanging
*/
buf = OBJ_NEW(opal_buffer_t);
/* pack our process name so the remote end can use the std
* unpacking routine
*/
if (OPAL_SUCCESS != (rc = opal_dss.pack(buf, ORTE_PROC_MY_NAME, 1, ORTE_NAME))) {
ORTE_ERROR_LOG(rc);
goto respond;
}
/* collect the desired data */
if (ORTE_SUCCESS != (rc = orte_grpcomm_base_pack_modex_entries(buf, req->scope))) {
ORTE_ERROR_LOG(rc);
}
respond:
if (ORTE_SUCCESS != (rc = orte_rml.send_buffer_nb(&req->peer, buf,
ORTE_RML_TAG_DIRECT_MODEX_RESP,
orte_rml_send_callback, NULL))) {
ORTE_ERROR_LOG(rc);
OBJ_RELEASE(buf);
}
}
}
static void direct_modex(int status, orte_process_name_t* sender, static void direct_modex(int status, orte_process_name_t* sender,
opal_buffer_t* buffer, orte_rml_tag_t tag, opal_buffer_t* buffer, orte_rml_tag_t tag,
void* cbdata) void* cbdata)
@ -322,6 +358,7 @@ static void direct_modex(int status, orte_process_name_t* sender,
opal_buffer_t *buf; opal_buffer_t *buf;
int rc, cnt; int rc, cnt;
opal_scope_t scope; opal_scope_t scope;
orte_grpcomm_modex_req_t *req;
OPAL_OUTPUT_VERBOSE((5, orte_grpcomm_base_framework.framework_output, OPAL_OUTPUT_VERBOSE((5, orte_grpcomm_base_framework.framework_output,
"%s providing direct modex for %s", "%s providing direct modex for %s",
@ -340,6 +377,26 @@ static void direct_modex(int status, orte_process_name_t* sender,
goto respond; goto respond;
} }
/* if we haven't made it to our own modex, then we may
* not yet have all the required info
*/
if (!orte_grpcomm_base.modex_ready) {
/* we are in an event, so it is safe to access
* the global list of requests - record this one.
* Note that we don't support multiple requests
* pending from the same proc as we can't know
* which thread to return the data to, so we
* require that the remote proc only allow
* one thread at a time to call modex_recv
*/
req = OBJ_NEW(orte_grpcomm_modex_req_t);
req->peer = *sender;
req->scope = scope;
opal_list_append(&orte_grpcomm_base.modex_requests, &req->super);
OBJ_RELEASE(buf);
return;
}
/* pack our process name so the remote end can use the std /* pack our process name so the remote end can use the std
* unpacking routine * unpacking routine
*/ */

Просмотреть файл

@ -11,6 +11,7 @@
* All rights reserved. * All rights reserved.
* Copyright (c) 2011-2012 Los Alamos National Security, LLC. All rights * Copyright (c) 2011-2012 Los Alamos National Security, LLC. All rights
* reserved. * reserved.
* Copyright (c) 2014 Intel, Inc. All rights reserved.
* $COPYRIGHT$ * $COPYRIGHT$
* *
* Additional copyrights may follow * Additional copyrights may follow
@ -51,6 +52,7 @@ typedef uint32_t orte_proc_state_t;
#define ORTE_PROC_STATE_REGISTERED 5 /* proc registered sync */ #define ORTE_PROC_STATE_REGISTERED 5 /* proc registered sync */
#define ORTE_PROC_STATE_IOF_COMPLETE 6 /* io forwarding pipes have closed */ #define ORTE_PROC_STATE_IOF_COMPLETE 6 /* io forwarding pipes have closed */
#define ORTE_PROC_STATE_WAITPID_FIRED 7 /* waitpid fired on process */ #define ORTE_PROC_STATE_WAITPID_FIRED 7 /* waitpid fired on process */
#define ORTE_PROC_STATE_MODEX_READY 8 /* all modex info has been stored */
/* /*
* Define a "boundary" so we can easily and quickly determine * Define a "boundary" so we can easily and quickly determine