1
1

send packed buffers instead of using iovecs in common sm rml. this commit will

hopefully resolve the periodic bus errors that some mtt tests have been
encountering.

This commit was SVN r25692.
Этот коммит содержится в:
Samuel Gutierrez 2012-01-05 00:11:59 +00:00
родитель b3bfae129b
Коммит d1a44ecd34
4 изменённых файлов: 130 добавлений и 158 удалений

Просмотреть файл

@ -11,7 +11,7 @@
* All rights reserved.
* Copyright (c) 2007 Sun Microsystems, Inc. All rights reserved.
* Copyright (c) 2008-2010 Cisco Systems, Inc. All rights reserved.
* Copyright (c) 2010-2011 Los Alamos National Security, LLC.
* Copyright (c) 2010-2012 Los Alamos National Security, LLC.
* All rights reserved.
* $COPYRIGHT$
*
@ -20,11 +20,25 @@
* $HEADER$
*/
#include "ompi_config.h"
/* ASSUMING local process homogeneity with respect to all utilized shared memory
* facilities. that is, if one local process deems a particular shared memory
* facility acceptable, then ALL local processes should be able to utilize that
* facility. as it stands, this is an important point because one process
* dictates to all other local processes which common sm component will be
* selected based on its own, local run-time test.
*/
#ifdef HAVE_STRING_H
#include <string.h>
#endif
/* RML Messaging in common sm and Our Assumptions
* o MPI_Init is single threaded
* o this routine will not be called after MPI_Init.
*
* if these assumptions ever change, then we may need to add some support code
* that queues up RML messages that have arrived, but have not yet been
* consumed by the thread who is looking to complete its component
* initialization.
*/
#include "ompi_config.h"
#include "opal/align.h"
#include "opal/util/argv.h"
@ -32,6 +46,7 @@
#include "opal/runtime/opal_cr.h"
#endif
#include "orte/util/proc_info.h"
#include "orte/util/name_fns.h"
#include "orte/util/show_help.h"
#include "orte/runtime/orte_globals.h"
@ -43,14 +58,6 @@
#include "common_sm_rml.h"
/* ASSUMING local process homogeneity with respect to all utilized shared memory
* facilities. that is, if one local process deems a particular shared memory
* facility acceptable, then ALL local processes should be able to utilize that
* facility. as it stands, this is an important point because one process
* dictates to all other local processes which common sm component will be
* selected based on its own, local run-time test.
*/
OBJ_CLASS_INSTANCE(
mca_common_sm_module_t,
opal_list_item_t,
@ -58,17 +65,6 @@ OBJ_CLASS_INSTANCE(
NULL
);
/* list of RML messages that have arrived that have not yet been
* consumed by the thread who is looking to complete its component
* initialization based on the contents of the RML message.
*/
static opal_list_t pending_rml_msgs;
/* flag indicating whether or not pending_rml_msgs has been initialized */
static bool pending_rml_msgs_init = false;
/* lock to protect multiple instances of mca_common_sm_init() from being
* invoked simultaneously (because of RML usage).
*/
static opal_mutex_t mutex;
/* shared memory information used for initialization and setup. */
static opal_shmem_ds_t shmem_ds;
@ -154,7 +150,7 @@ mca_common_sm_init(ompi_proc_t **procs,
ompi_proc_t *temp_proc = NULL;
bool found_lowest = false;
size_t num_local_procs = 0, p = 0;
/* o reorder procs array to have all the local procs at the beginning.
* o look for the local proc with the lowest name.
* o determine the number of local procs.
@ -200,16 +196,6 @@ mca_common_sm_init(ompi_proc_t **procs,
ORTE_PROC_MY_NAME,
&(procs[0]->proc_name)));
/* lock here to prevent multiple threads from invoking this
* function simultaneously. the critical section we're protecting
* is usage of the RML in this block.
*/
opal_mutex_lock(&mutex);
if (!pending_rml_msgs_init) {
OBJ_CONSTRUCT(&(pending_rml_msgs), opal_list_t);
pending_rml_msgs_init = true;
}
/* figure out if i am the lowest rank in the group.
* if so, i will create the shared memory backing store
*/
@ -239,11 +225,10 @@ mca_common_sm_init(ompi_proc_t **procs,
}
/* send shmem info to the rest of the local procs. */
if (OMPI_SUCCESS != mca_common_sm_rml_info_bcast(
&shmem_ds, procs, num_local_procs,
OMPI_RML_TAG_SM_BACK_FILE_CREATED,
lowest_local_proc, file_name,
&(pending_rml_msgs))) {
if (OMPI_SUCCESS !=
mca_common_sm_rml_info_bcast(&shmem_ds, procs, num_local_procs,
OMPI_RML_TAG_SM_BACK_FILE_CREATED,
lowest_local_proc, file_name)) {
goto out;
}
@ -266,7 +251,6 @@ mca_common_sm_init(ompi_proc_t **procs,
}
out:
opal_mutex_unlock(&mutex);
return map;
}

Просмотреть файл

@ -11,7 +11,7 @@
* All rights reserved.
* Copyright (c) 2007 Sun Microsystems, Inc. All rights reserved.
* Copyright (c) 2008-2010 Cisco Systems, Inc. All rights reserved.
* Copyright (c) 2010-2011 Los Alamos National Security, LLC.
* Copyright (c) 2010-2012 Los Alamos National Security, LLC.
* All rights reserved.
* $COPYRIGHT$
*
@ -22,11 +22,11 @@
#include "ompi_config.h"
#ifdef HAVE_STRING_H
#include <string.h>
#endif
#include "opal/types.h"
#include "opal/dss/dss.h"
#include "orte/mca/rml/rml.h"
#include "orte/util/proc_info.h"
#include "orte/util/name_fns.h"
#include "orte/util/show_help.h"
#include "orte/runtime/orte_globals.h"
@ -36,58 +36,63 @@
#include "ompi/mca/dpm/dpm.h"
#include "ompi/mca/common/sm/common_sm_rml.h"
OBJ_CLASS_INSTANCE(
mca_common_sm_rml_pending_rml_msg_types_t,
opal_object_t,
NULL,
NULL
);
/* only for debug purposes only */
#include <assert.h>
#ifdef HAVE_STRING_H
#include <string.h>
#endif
/* ////////////////////////////////////////////////////////////////////////// */
/**
* this routine assumes that sorted_procs is in the following state:
* o all the local procs at the beginning.
* o sorted_procs[0] is the lowest named process.
* o procs[0] is the lowest named process.
*/
int
mca_common_sm_rml_info_bcast(opal_shmem_ds_t *ds_buf,
mca_common_sm_rml_info_bcast(opal_shmem_ds_t *out_ds_buf,
ompi_proc_t **procs,
size_t num_procs,
size_t num_local_procs,
int tag,
bool bcast_root,
char *msg_id_str,
opal_list_t *pending_rml_msgs)
bool proc0,
char *msg_id_str)
{
int rc = OMPI_SUCCESS;
struct iovec iov[MCA_COMMON_SM_RML_MSG_LEN];
int rc = OMPI_SUCCESS, tmprc;
char *msg_id_str_to_tx = NULL;
int iovrc;
size_t p;
opal_buffer_t *buffer = NULL;
if (NULL == (msg_id_str_to_tx = (char *)calloc(OPAL_PATH_MAX,
sizeof(char)))) {
if (NULL == (buffer = OBJ_NEW(opal_buffer_t))) {
return OMPI_ERR_OUT_OF_RESOURCE;
}
strncpy(msg_id_str_to_tx, msg_id_str, OPAL_PATH_MAX - 1);
/* let the first item be the queueing id name */
iov[0].iov_base = (ompi_iov_base_ptr_t)msg_id_str_to_tx;
iov[0].iov_len = (size_t)OPAL_PATH_MAX;
iov[1].iov_base = (ompi_iov_base_ptr_t)ds_buf;
iov[1].iov_len = sizeof(opal_shmem_ds_t);
/* figure out if i am the root proc in the group.
* if i am, bcast the message the rest of the local procs.
*/
if (bcast_root) {
/* figure out if i am the root proc in the group. if i am, bcast the
* message the rest of the local procs. */
if (proc0) {
size_t p;
/* pack the data that we are going to send. first the queueing id, then
* the shmem_ds buf. note that msg_id_str is used only for verifying
* "expected" common sm usage. see "RML Messaging and Our Assumptions"
* note in common_sm.c for more details. */
tmprc = opal_dss.pack(buffer, &msg_id_str, 1, OPAL_STRING);
if (OPAL_SUCCESS != tmprc) {
ORTE_ERROR_LOG(ORTE_ERR_PACK_FAILURE);
rc = OMPI_ERR_PACK_FAILURE;
goto out;
}
tmprc = opal_dss.pack(buffer, out_ds_buf,
(int32_t)sizeof(opal_shmem_ds_t),
OPAL_BYTE);
if (OPAL_SUCCESS != tmprc) {
ORTE_ERROR_LOG(ORTE_ERR_PACK_FAILURE);
rc = OMPI_ERR_PACK_FAILURE;
goto out;
}
opal_progress_event_users_increment();
/* first num_procs items should be local procs */
for (p = 1; p < num_procs; ++p) {
iovrc = orte_rml.send(&(procs[p]->proc_name), iov,
MCA_COMMON_SM_RML_MSG_LEN, tag, 0);
if ((ssize_t)(iov[0].iov_len + iov[1].iov_len) > iovrc) {
ORTE_ERROR_LOG(ORTE_ERR_COMM_FAILURE);
/* first num_local_procs items should be local procs */
for (p = 1; p < num_local_procs; ++p) {
/* a potential future optimization: use non-blocking routines */
tmprc = orte_rml.send_buffer(&(procs[p]->proc_name), buffer, tag,
0);
if (0 > tmprc) {
ORTE_ERROR_LOG(tmprc);
opal_progress_event_users_decrement();
rc = OMPI_ERROR;
goto out;
@ -95,61 +100,47 @@ mca_common_sm_rml_info_bcast(opal_shmem_ds_t *ds_buf,
}
opal_progress_event_users_decrement();
}
else { /* i am NOT the root ("lowest") proc */
opal_list_item_t *item;
mca_common_sm_rml_pending_rml_msg_types_t *rml_msg;
/* because a component query can be performed simultaneously in multiple
* threads, the RML messages may arrive in any order. so first check to
* see if we previously received a message for me.
*/
for (item = opal_list_get_first(pending_rml_msgs);
opal_list_get_end(pending_rml_msgs) != item;
item = opal_list_get_next(item)) {
rml_msg = (mca_common_sm_rml_pending_rml_msg_types_t *)item;
/* was the message for me? */
if (0 == strcmp(rml_msg->msg_id_str, msg_id_str)) {
opal_list_remove_item(pending_rml_msgs, item);
/* from ==============> to */
opal_shmem_ds_copy(&rml_msg->shmem_ds, ds_buf);
OBJ_RELEASE(item);
break;
}
/* i am NOT the root proc */
else {
int32_t num_vals;
/* bump up the libevent polling frequency while we're in this RML recv,
* just to ensure we're checking libevent frequently. */
opal_progress_event_users_increment();
tmprc = orte_rml.recv_buffer(&(procs[0]->proc_name), buffer, tag, 0);
opal_progress_event_users_decrement();
if (0 > tmprc) {
ORTE_ERROR_LOG(tmprc);
rc = OMPI_ERROR;
goto out;
}
/* if we didn't find a message already waiting, block on receiving from
* the RML.
*/
if (opal_list_get_end(pending_rml_msgs) == item) {
do {
/* bump up the libevent polling frequency while we're in this
* RML recv, just to ensure we're checking libevent frequently.
*/
opal_progress_event_users_increment();
iovrc = orte_rml.recv(&(procs[0]->proc_name), iov,
MCA_COMMON_SM_RML_MSG_LEN, tag, 0);
opal_progress_event_users_decrement();
if (iovrc < 0) {
ORTE_ERROR_LOG(ORTE_ERR_RECV_LESS_THAN_POSTED);
rc = OMPI_ERROR;
goto out;
}
/* was the message for me? if so, we're done */
if (0 == strcmp(msg_id_str_to_tx, msg_id_str)) {
break;
}
/* if not, put it on the pending list and try again */
if (NULL == (rml_msg =
OBJ_NEW(mca_common_sm_rml_pending_rml_msg_types_t)))
{
ORTE_ERROR_LOG(OMPI_ERR_OUT_OF_RESOURCE);
rc = OMPI_ERROR;
goto out;
}
/* not for me, so place on list */
/* from ========> to */
opal_shmem_ds_copy(ds_buf, &rml_msg->shmem_ds);
memcpy(rml_msg->msg_id_str, msg_id_str_to_tx, OPAL_PATH_MAX);
opal_list_append(pending_rml_msgs, &(rml_msg->super));
} while(1);
/* unpack the buffer */
num_vals = 1;
tmprc = opal_dss.unpack(buffer, &msg_id_str_to_tx, &num_vals,
OPAL_STRING);
if (0 > tmprc) {
ORTE_ERROR_LOG(ORTE_ERR_UNPACK_FAILURE);
rc = OMPI_ERROR;
goto out;
}
num_vals = (int32_t)sizeof(opal_shmem_ds_t);
tmprc = opal_dss.unpack(buffer, out_ds_buf, &num_vals, OPAL_BYTE);
if (0 > tmprc) {
ORTE_ERROR_LOG(ORTE_ERR_UNPACK_FAILURE);
rc = OMPI_ERROR;
goto out;
}
/* the message better be for me. if not, freak out because this
* probably means that common sm is being used in a new way that lies
* outside of our current scope of assumptions. see "RML Messaging and
* Our Assumptions" note in common_sm.c */
if (0 != strcmp(msg_id_str_to_tx, msg_id_str)) {
orte_show_help("help-mpi-common-sm.txt", "unexpected message id",
true, orte_process_info.nodename,
msg_id_str, msg_id_str_to_tx);
rc = OMPI_ERROR;
/* here for extra debug info only */
assert(0);
goto out;
}
}
@ -158,6 +149,6 @@ out:
free(msg_id_str_to_tx);
msg_id_str_to_tx = NULL;
}
OBJ_RELEASE(buffer);
return rc;
}

Просмотреть файл

@ -10,7 +10,7 @@
* Copyright (c) 2004-2005 The Regents of the University of California.
* All rights reserved.
* Copyright (c) 2009-2010 Cisco Systems, Inc. All rights reserved.
* Copyright (c) 2010-2011 Los Alamos National Security, LLC.
* Copyright (c) 2010-2012 Los Alamos National Security, LLC.
* All rights reserved.
* $COPYRIGHT$
*
@ -33,33 +33,20 @@
#include "ompi/proc/proc.h"
#include "ompi/mca/common/sm/common_sm.h"
#define MCA_COMMON_SM_RML_MSG_LEN 2
BEGIN_C_DECLS
/**
* items on the pending_rml_msgs list
*/
typedef struct mca_common_sm_rml_pending_rml_msg_types_t {
opal_list_item_t super;
char msg_id_str[OPAL_PATH_MAX];
opal_shmem_ds_t shmem_ds;
} mca_common_sm_rml_pending_rml_msg_types_t;
/**
* routine used to send common sm initialization information to all local
* processes in procs.
*/
OMPI_DECLSPEC extern int
mca_common_sm_rml_info_bcast(opal_shmem_ds_t *ds_buf,
mca_common_sm_rml_info_bcast(opal_shmem_ds_t *out_ds_buf,
ompi_proc_t **procs,
size_t num_procs,
size_t num_local_procs,
int tag,
bool bcast_root,
char *msg_id_str,
opal_list_t *pending_rml_msgs);
bool proc0,
char *msg_id_str);
END_C_DECLS
#endif /* _COMMON_SM_RML_H_*/

Просмотреть файл

@ -1,13 +1,13 @@
# -*- text -*-
#
# Copyright (c) 2009-2010 Cisco Systems, Inc. All rights reserved.
# Copyright (c) 2010-2011 Los Alamos National Security, LLC.
# Copyright (c) 2010-2012 Los Alamos National Security, LLC.
# All rights reserved.
#
# $COPYRIGHT$
#
#
# Additional copyrights may follow
#
#
# $HEADER$
#
# This is the US/English help file for Open MPI's common shmem support.
@ -22,3 +22,13 @@ Open MPI, and if not, contact the Open MPI developers.
Requested size: %ul
Control seg size: %ul
Data seg aligment: %ul
#
[unexpected message id]
Open MPI received an unexpected message ID during common sm initialization.
This is likely an error in Open MPI itself. If you see this error, you should
see if there is an update available for Open MPI that addresses this issue, and
if not, contact the Open MPI developers.
Local Host: %s
Expected Message ID: %s
Message ID Received: %s