1
1

Do some cleanup of the way we handle modex data. Identify data that needs to be shared with peers in my job vs data that needs to be shared with non-peers - no point in sharing extra data. When we share data with some process(es) from another job, we cannot know in advance what info they have or lack, so we have to share everything just in case. This limits the optimization we can do for things like comm_spawn.

Create a new required key in the OMPI layer for retrieving a "node id" from the database. ALL RTE'S MUST DEFINE THIS KEY. This allows us to compute locality in the MPI layer, which is necessary when we do things like intercomm_create.

cmr:v1.7.4:reviewer=rhc:subject=Cleanup handling of modex data

This commit was SVN r29274.
This commit is contained in:
Ralph Castain 2013-09-27 00:37:49 +00:00
parent bc92c260ca
commit d565a76814
31 changed files with 599 additions and 934 deletions

View File

@ -1630,47 +1630,6 @@ static void timeout_cb(int fd, short args, void *cbdata)
OBJ_RELEASE(req);
}
static int pack_request(opal_buffer_t *buf, ompi_group_t *group)
{
int rc;
/* pack the MPI info */
ompi_proc_pack(group->grp_proc_pointers, 1, false, buf);
/* pack our hostname */
if (ORTE_SUCCESS != (rc = opal_dss.pack(buf, &orte_process_info.nodename, 1, OPAL_STRING))) {
ORTE_ERROR_LOG(rc);
return rc;
}
/* pack our node rank */
if (ORTE_SUCCESS != (rc = opal_dss.pack(buf, &orte_process_info.my_node_rank, 1, ORTE_NODE_RANK))) {
ORTE_ERROR_LOG(rc);
return rc;
}
/* pack our local rank */
if (ORTE_SUCCESS != (rc = opal_dss.pack(buf, &orte_process_info.my_local_rank, 1, ORTE_LOCAL_RANK))) {
ORTE_ERROR_LOG(rc);
return rc;
}
#if OPAL_HAVE_HWLOC
/* pack our binding info so other procs can determine our locality */
if (ORTE_SUCCESS != (rc = opal_dss.pack(buf, &orte_process_info.cpuset, 1, OPAL_STRING))) {
ORTE_ERROR_LOG(rc);
return rc;
}
#endif
/* pack the modex entries we have received */
if (ORTE_SUCCESS != (rc = orte_grpcomm_base_pack_modex_entries(buf))) {
ORTE_ERROR_LOG(rc);
return rc;
}
return ORTE_SUCCESS;
}
static void process_request(orte_process_name_t* sender,
opal_buffer_t *buffer,
bool connector,
@ -1686,10 +1645,6 @@ static void process_request(orte_process_name_t* sender,
opal_buffer_t *xfer;
int cnt, rc;
uint32_t id;
char *hostname;
orte_node_rank_t node_rank;
orte_local_rank_t local_rank;
opal_hwloc_locality_t locality;
OPAL_OUTPUT_VERBOSE((2, ompi_dpm_base_framework.framework_output,
"%s dpm:pconprocess: PROCESS REQUEST: %s",
@ -1718,118 +1673,19 @@ static void process_request(orte_process_name_t* sender,
* and then call PML add_procs
*/
if (0 < new_proc_len) {
/* unpack the peer's hostname and store it */
cnt=1;
if (OPAL_SUCCESS != (rc = opal_dss.unpack(buffer, &hostname, &cnt, OPAL_STRING))) {
ORTE_ERROR_LOG(rc);
goto cleanup;
}
if (ORTE_SUCCESS != (rc = opal_db.store((opal_identifier_t*)sender, OPAL_DB_INTERNAL, ORTE_DB_HOSTNAME, hostname, OPAL_STRING))) {
ORTE_ERROR_LOG(rc);
goto cleanup;
}
/* unpack the node rank */
cnt = 1;
if (ORTE_SUCCESS != (rc = opal_dss.unpack(buffer, &node_rank, &cnt, ORTE_NODE_RANK))) {
ORTE_ERROR_LOG(rc);
goto cleanup;
}
if (ORTE_SUCCESS != (rc = opal_db.store((opal_identifier_t*)sender, OPAL_DB_INTERNAL, ORTE_DB_NODERANK, &node_rank, ORTE_NODE_RANK))) {
ORTE_ERROR_LOG(rc);
goto cleanup;
}
/* unpack the local rank */
cnt = 1;
if (ORTE_SUCCESS != (rc = opal_dss.unpack(buffer, &local_rank, &cnt, ORTE_LOCAL_RANK))) {
ORTE_ERROR_LOG(rc);
goto cleanup;
}
if (ORTE_SUCCESS != (rc = opal_db.store((opal_identifier_t*)sender, OPAL_DB_INTERNAL, ORTE_DB_LOCALRANK, &local_rank, ORTE_LOCAL_RANK))) {
ORTE_ERROR_LOG(rc);
goto cleanup;
}
/* compute the locality and store in the database */
#if OPAL_HAVE_HWLOC
{
char *cpuset;
/* unpack and store the cpuset - could be NULL */
cnt = 1;
if (ORTE_SUCCESS != (rc = opal_dss.unpack(buffer, &cpuset, &cnt, OPAL_STRING))) {
ORTE_ERROR_LOG(rc);
goto cleanup;
}
if (ORTE_SUCCESS != (rc = opal_db.store((opal_identifier_t*)sender, OPAL_DB_INTERNAL, ORTE_DB_CPUSET, cpuset, OPAL_STRING))) {
ORTE_ERROR_LOG(rc);
goto cleanup;
}
OPAL_OUTPUT_VERBOSE((2, ompi_dpm_base_framework.framework_output,
"%s dpm:pconprocess: setting proc %s cpuset %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(sender), cpuset));
if (0 != strcmp(hostname, orte_process_info.nodename)) {
/* this is on a different node, then mark as non-local */
OPAL_OUTPUT_VERBOSE((5, ompi_dpm_base_framework.framework_output,
"%s dpm:pconprocess: setting proc %s locale NONLOCAL",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(sender)));
locality = OPAL_PROC_NON_LOCAL;
} else if (NULL == cpuset || NULL == orte_process_info.cpuset) {
/* one or both of us is not bound, so all we can say is we are on the
* same node
*/
locality = OPAL_PROC_ON_NODE;
} else {
/* determine relative location on our node */
locality = opal_hwloc_base_get_relative_locality(opal_hwloc_topology,
orte_process_info.cpuset,
cpuset);
OPAL_OUTPUT_VERBOSE((5, ompi_dpm_base_framework.framework_output,
"%s dpm:pconprocess: setting proc %s locale %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(sender),
opal_hwloc_base_print_locality(locality)));
}
}
#else
if (0 != strcmp(hostname, orte_process_info.nodename)) {
/* this is on a different node, then mark as non-local */
OPAL_OUTPUT_VERBOSE((5, ompi_dpm_base_framework.framework_output,
"%s dpm:pconprocess: setting proc %s locale NONLOCAL",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(sender)));
locality = OPAL_PROC_NON_LOCAL;
} else {
/* must be on our node */
locality = OPAL_PROC_ON_NODE;
}
#endif
if (ORTE_SUCCESS != (rc = opal_db.store((opal_identifier_t*)sender, OPAL_DB_INTERNAL, ORTE_DB_LOCALITY, &locality, OPAL_HWLOC_LOCALITY_T))) {
ORTE_ERROR_LOG(rc);
goto cleanup;
}
OPAL_OUTPUT_VERBOSE((5, ompi_dpm_base_framework.framework_output,
"%s dpm:pconprocess: adding modex entry for proc %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(sender)));
/* process the modex info */
if (ORTE_SUCCESS != (rc = orte_grpcomm_base_update_modex_entries(sender, buffer))) {
ORTE_ERROR_LOG(rc);
goto cleanup;
}
OPAL_OUTPUT_VERBOSE((3, ompi_dpm_base_framework.framework_output,
"%s dpm:pconprocess: process modex",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
orte_grpcomm_base_store_modex(buffer, NULL);
OPAL_OUTPUT_VERBOSE((3, ompi_dpm_base_framework.framework_output,
"%s dpm:pconprocess: adding procs",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
if (OMPI_SUCCESS != (rc = MCA_PML_CALL(add_procs(new_proc_list, new_proc_len)))) {
ORTE_ERROR_LOG(rc);
goto cleanup;
}
OPAL_OUTPUT_VERBOSE((3, ompi_dpm_base_framework.framework_output,
"%s dpm:orte:pconnect new procs added",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
@ -1850,7 +1706,7 @@ static void process_request(orte_process_name_t* sender,
goto cleanup;
}
/* pack the remaining info */
if (ORTE_SUCCESS != pack_request(xfer, group)) {
if (ORTE_SUCCESS != ompi_proc_pack(group->grp_proc_pointers, 1, true, xfer)) {
OBJ_RELEASE(xfer);
goto cleanup;
}
@ -2039,7 +1895,7 @@ static int dpm_pconnect(char *port,
return rc;
}
/* pack the request info */
if (ORTE_SUCCESS != pack_request(buf, group)) {
if (ORTE_SUCCESS != ompi_proc_pack(group->grp_proc_pointers, 1, true, buf)) {
OBJ_RELEASE(buf);
OPAL_THREAD_LOCK(&ompi_dpm_port_mutex);
opal_list_remove_item(&orte_dpm_connectors, &connector->super);

View File

@ -54,6 +54,8 @@ typedef orte_ns_cmp_bitmask_t ompi_rte_cmp_bitmask_t;
#define OMPI_NAME ORTE_NAME
#define OMPI_PROCESS_NAME_HTON ORTE_PROCESS_NAME_HTON
#define OMPI_PROCESS_NAME_NTOH ORTE_PROCESS_NAME_NTOH
#define OMPI_RTE_NODE_ID ORTE_DB_DAEMON_VPID
#define OMPI_RTE_MY_NODEID ORTE_PROC_MY_DAEMON->vpid
/* Collective objects and operations */
#define ompi_rte_collective_t orte_grpcomm_collective_t

View File

@ -139,7 +139,8 @@ void ompi_rte_wait_for_debugger(void)
int ompi_rte_db_store(const orte_process_name_t *nm, const char* key,
const void *data, opal_data_type_t type)
{
return opal_db.store((opal_identifier_t*)nm, OPAL_DB_GLOBAL, key, data, type);
/* MPI connection data is to be shared with ALL other processes */
return opal_db.store((opal_identifier_t*)nm, OPAL_SCOPE_GLOBAL, key, data, type);
}
int ompi_rte_db_fetch(const orte_process_name_t *nm,
@ -185,7 +186,9 @@ int ompi_rte_db_fetch_multiple(const orte_process_name_t *nm,
ompi_proc_t *proct;
int rc;
if (OPAL_SUCCESS != (rc = opal_db.fetch_multiple((opal_identifier_t*)nm, key, kvs))) {
/* MPI processes are only concerned with shared info */
if (OPAL_SUCCESS != (rc = opal_db.fetch_multiple((opal_identifier_t*)nm,
OPAL_SCOPE_GLOBAL, key, kvs))) {
return rc;
}
/* update the hostname */

View File

@ -191,6 +191,13 @@ END_C_DECLS
BEGIN_C_DECLS
/* Each RTE is required to define a DB key for identifying the node
* upon which a process resides, and for providing this information
* for each process
*
* #define OMPI_RTE_NODE_ID
*/
/* Communication tags */
#define OMPI_RML_TAG_UDAPL OMPI_RML_TAG_BASE+1
#define OMPI_RML_TAG_OPENIB OMPI_RML_TAG_BASE+2

View File

@ -126,6 +126,62 @@ int ompi_proc_init(void)
return OMPI_SUCCESS;
}
static int ompi_proc_set_locality(ompi_proc_t *proc)
{
opal_hwloc_locality_t *hwlocale, locality;
ompi_vpid_t vpid, *vptr;
int ret;
/* get the locality information - do not use modex recv for
* this request as that will automatically cause the hostname
* to be loaded as well
*/
hwlocale = &(proc->proc_flags);
if (OMPI_SUCCESS == opal_db.fetch((opal_identifier_t*)&proc->proc_name, OPAL_DB_LOCALITY,
(void**)&hwlocale, OPAL_HWLOC_LOCALITY_T)) {
return OMPI_SUCCESS;
}
/* if we don't already have it, compute and save it for future use */
vptr = &vpid;
if (OMPI_SUCCESS != (ret = opal_db.fetch((opal_identifier_t*)&proc->proc_name, OMPI_RTE_NODE_ID,
(void**)&vptr, OPAL_UINT32))) {
return ret;
}
/* if we are on different nodes, then we are non-local */
if (vpid != OMPI_RTE_MY_NODEID) {
locality = OPAL_PROC_NON_LOCAL;
} else {
#if OPAL_HAVE_HWLOC
{
char *cpu_bitmap;
/* retrieve the binding for the other proc */
if (OMPI_SUCCESS != opal_db.fetch((opal_identifier_t*)&proc, OPAL_DB_CPUSET,
(void**)&cpu_bitmap, OPAL_STRING)) {
/* we don't know their cpuset, so nothing more we can say */
locality = OPAL_PROC_ON_NODE;
} else if (NULL == cpu_bitmap || NULL == ompi_process_info.cpuset) {
/* one or both of us is not bound, so all we can say is we are on the
* same node
*/
locality = OPAL_PROC_ON_NODE;
} else {
/* we share a node - see what else we share */
locality = opal_hwloc_base_get_relative_locality(opal_hwloc_topology,
ompi_process_info.cpuset,
cpu_bitmap);
}
}
#else
/* all we know is that we share this node */
locality = OPAL_PROC_ON_NODE;
#endif
}
ret = opal_db.store((opal_identifier_t*)&proc, OPAL_SCOPE_INTERNAL,
OPAL_DB_LOCALITY, &locality, OPAL_HWLOC_LOCALITY_T);
return ret;
}
/**
* The process creation is split into two steps. The second step
@ -141,7 +197,6 @@ int ompi_proc_complete_init(void)
ompi_proc_t *proc = NULL;
opal_list_item_t *item = NULL;
int ret, errcode = OMPI_SUCCESS;
opal_hwloc_locality_t *hwlocale;
OPAL_THREAD_LOCK(&ompi_proc_lock);
@ -151,13 +206,8 @@ int ompi_proc_complete_init(void)
proc = (ompi_proc_t*)item;
if (proc->proc_name.vpid != OMPI_PROC_MY_NAME->vpid) {
/* get the locality information - do not use modex recv for
* this request as that will automatically cause the hostname
* to be loaded as well
*/
hwlocale = &(proc->proc_flags);
ret = opal_db.fetch((opal_identifier_t*)&proc->proc_name, OMPI_DB_LOCALITY,
(void**)&hwlocale, OPAL_HWLOC_LOCALITY_T);
/* get the locality information */
ret = ompi_proc_set_locality(proc);
if (OMPI_SUCCESS != ret) {
errcode = ret;
break;
@ -374,7 +424,6 @@ int ompi_proc_refresh(void) {
opal_list_item_t *item = NULL;
ompi_vpid_t i = 0;
int ret=OMPI_SUCCESS;
opal_hwloc_locality_t *hwlocale;
OPAL_THREAD_LOCK(&ompi_proc_lock);
@ -395,13 +444,8 @@ int ompi_proc_refresh(void) {
proc->proc_hostname = ompi_process_info.nodename;
proc->proc_arch = opal_local_arch;
} else {
/* get the locality information - do not use modex recv for
* this request as that will automatically cause the hostname
* to be loaded as well
*/
hwlocale = &(proc->proc_flags);
ret = opal_db.fetch((opal_identifier_t*)&proc->proc_name, OMPI_DB_LOCALITY,
(void**)&hwlocale, OPAL_HWLOC_LOCALITY_T);
/* get the locality information */
ret = ompi_proc_set_locality(proc);
if (OMPI_SUCCESS != ret) {
break;
}
@ -468,22 +512,25 @@ ompi_proc_pack(ompi_proc_t **proclist, int proclistsize,
OPAL_THREAD_UNLOCK(&ompi_proc_lock);
return rc;
}
if( full_info ) {
if (full_info) {
int32_t num_entries;
opal_value_t *kv;
opal_list_t data;
/* fetch what we know about the peer */
/* fetch all global info we know about the peer - while
* the remote procs may already know some of it, we cannot
* be certain they do. So we must include a full dump of
* everything we know about this proc, excluding INTERNAL
* data that each process computes about its peers
*/
OBJ_CONSTRUCT(&data, opal_list_t);
rc = opal_db.fetch_multiple((opal_identifier_t*)&proclist[i]->proc_name, NULL, &data);
rc = opal_db.fetch_multiple((opal_identifier_t*)&proclist[i]->proc_name,
OPAL_SCOPE_GLOBAL, NULL, &data);
if (OPAL_SUCCESS != rc) {
OMPI_ERROR_LOG(rc);
num_entries = 0;
} else {
/* count the number of entries we will send, purging the rest
* sadly, there is no RTE-agnostic way of pruning these, so
* just send them all
*/
/* count the number of entries we will send */
num_entries = opal_list_get_size(&data);
}
@ -603,7 +650,7 @@ ompi_proc_unpack(opal_buffer_t* buf,
free(newprocs);
return rc;
}
if(!full_info) {
if (!full_info) {
rc = opal_dss.unpack(buf, &new_arch, &count, OPAL_UINT32);
if (rc != OPAL_SUCCESS) {
OMPI_ERROR_LOG(rc);
@ -628,7 +675,7 @@ ompi_proc_unpack(opal_buffer_t* buf,
*/
newprocs[newprocs_len++] = plist[i];
if( full_info ) {
if (full_info) {
int32_t num_recvd_entries;
orte_std_cntr_t cnt;
orte_std_cntr_t j;
@ -656,26 +703,36 @@ ompi_proc_unpack(opal_buffer_t* buf,
OBJ_RELEASE(kv);
} else {
/* store it in the database */
if (OPAL_SUCCESS != (rc = opal_db.store_pointer((opal_identifier_t*)&new_name,
OPAL_DB_GLOBAL, kv))) {
if (OPAL_SUCCESS != (rc = opal_db.store_pointer((opal_identifier_t*)&new_name, kv))) {
OMPI_ERROR_LOG(rc);
OBJ_RELEASE(kv);
}
/* do not release the kv - the db holds that pointer */
}
}
#if OPAL_ENABLE_HETEROGENEOUS_SUPPORT
rc = opal_db.fetch((opal_identifier_t*)&new_name, "OMPI_ARCH",
(void**)&new_arch, OPAL_UINT32);
if( OPAL_SUCCESS != rc ) {
if( OPAL_SUCCESS == rc ) {
new_arch = opal_local_arch;
}
rc = opal_db.fetch_pointer((opal_identifier_t*)&new_name, ORTE_DB_HOSTNAME,
(void**)&new_hostname, OPAL_STRING);
if( OPAL_SUCCESS != rc ) {
#else
new_arch = opal_local_arch;
#endif
if (ompi_process_info.num_procs < ompi_hostname_cutoff) {
/* retrieve the hostname */
rc = opal_db.fetch_pointer((opal_identifier_t*)&new_name, ORTE_DB_HOSTNAME,
(void**)&new_hostname, OPAL_STRING);
if( OPAL_SUCCESS != rc ) {
new_hostname = NULL;
}
} else {
/* just set the hostname to NULL for now - we'll fill it in
* as modex_recv's are called for procs we will talk to
*/
new_hostname = NULL;
}
}
/* update all the values */
plist[i]->proc_arch = new_arch;
/* if arch is different than mine, create a new convertor for this proc */
@ -703,7 +760,7 @@ ompi_proc_unpack(opal_buffer_t* buf,
plist[i]->proc_hostname = new_hostname;
} else {
if( full_info ) {
if (full_info) {
int32_t num_recvd_entries;
orte_std_cntr_t j, cnt;

View File

@ -256,6 +256,7 @@ int opal_dss_copy_value(opal_value_t **dest, opal_value_t *src,
if (NULL != src->key) {
p->key = strdup(src->key);
}
p->scope = src->scope;
p->type = src->type;
/* copy the right field */

View File

@ -71,6 +71,7 @@ static void opal_value_construct(opal_value_t* ptr)
{
ptr->key = NULL;
ptr->type = OPAL_UNDEF;
ptr->scope = OPAL_SCOPE_UNDEF;
}
static void opal_value_destruct(opal_value_t* ptr)
{

View File

@ -662,6 +662,9 @@ int opal_dss_pack_value(opal_buffer_t *buffer, const void *src,
if (OPAL_SUCCESS != (ret = opal_dss_pack_string(buffer, &ptr[i]->key, 1, OPAL_STRING))) {
return ret;
}
if (OPAL_SUCCESS != (ret = opal_dss_pack_data_type(buffer, &ptr[i]->scope, 1, OPAL_DATA_SCOPE_T))) {
return ret;
}
if (OPAL_SUCCESS != (ret = opal_dss_pack_data_type(buffer, &ptr[i]->type, 1, OPAL_DATA_TYPE))) {
return ret;
}

View File

@ -513,6 +513,7 @@ int opal_dss_print_node_stat(char **output, char *prefix, opal_node_stats_t *src
int opal_dss_print_value(char **output, char *prefix, opal_value_t *src, opal_data_type_t type)
{
char *prefx;
char *scope;
/* deal with NULL prefix */
if (NULL == prefix) asprintf(&prefx, " ");
@ -525,28 +526,50 @@ int opal_dss_print_value(char **output, char *prefix, opal_value_t *src, opal_da
return OPAL_SUCCESS;
}
if (OPAL_SCOPE_UNDEF == src->scope) {
scope = "UNDEF";
} else if (OPAL_SCOPE_PEER == src->scope) {
scope = "PEER";
} else if (OPAL_SCOPE_NON_PEER == src->scope) {
scope = "NON_PEER";
} else if (OPAL_SCOPE_GLOBAL == src->scope) {
scope = "GLOBAL";
} else if (OPAL_SCOPE_INTERNAL == src->scope) {
scope = "INTERNAL";
} else if (OPAL_SCOPE_ALL == src->scope) {
scope = "ALL";
} else {
scope = "INTERNAL";
}
switch (src->type) {
case OPAL_STRING:
asprintf(output, "%sOPAL_VALUE: Data type: OPAL_STRING\tKey: %s\tValue: %s", prefx, src->key, src->data.string);
asprintf(output, "%sOPAL_VALUE: Data type: OPAL_STRING\tKey: %s\tScope:%s\tValue: %s",
prefx, src->key, scope, src->data.string);
break;
case OPAL_INT16:
asprintf(output, "%sOPAL_VALUE: Data type: OPAL_STRING\tKey: %s\tValue: %d", prefx, src->key, (int)src->data.int16);
asprintf(output, "%sOPAL_VALUE: Data type: OPAL_STRING\tKey: %s\tScope:%s\tValue: %d",
prefx, src->key, scope, (int)src->data.int16);
break;
case OPAL_INT32:
asprintf(output, "%sOPAL_VALUE: Data type: OPAL_INT32\tKey: %s\tValue: %d", prefx, src->key, src->data.int32);
asprintf(output, "%sOPAL_VALUE: Data type: OPAL_INT32\tKey: %s\tScope:%s\tValue: %d",
prefx, src->key, scope, src->data.int32);
break;
case OPAL_PID:
asprintf(output, "%sOPAL_VALUE: Data type: OPAL_STRING\tKey: %s\tValue: %lu", prefx, src->key, (unsigned long)src->data.pid);
asprintf(output, "%sOPAL_VALUE: Data type: OPAL_STRING\tKey: %s\tScope:%s\tValue: %lu",
prefx, src->key, scope, (unsigned long)src->data.pid);
break;
case OPAL_FLOAT:
asprintf(output, "%sOPAL_VALUE: Data type: OPAL_FLOAT\tKey: %s\tValue: %f", prefx, src->key, src->data.fval);
asprintf(output, "%sOPAL_VALUE: Data type: OPAL_FLOAT\tKey: %s\tScope:%s\tValue: %f",
prefx, src->key, scope, src->data.fval);
break;
case OPAL_TIMEVAL:
asprintf(output, "%sOPAL_VALUE: Data type: OPAL_TIMEVAL\tKey: %s\tValue: %ld.%06ld", prefx,
src->key, (long)src->data.tv.tv_sec, (long)src->data.tv.tv_usec);
asprintf(output, "%sOPAL_VALUE: Data type: OPAL_TIMEVAL\tKey: %s\tScope:%s\tValue: %ld.%06ld", prefx,
src->key, scope, (long)src->data.tv.tv_sec, (long)src->data.tv.tv_usec);
break;
default:
asprintf(output, "%sOPAL_VALUE: Data type: UNKNOWN\tKey: %s\tValue: UNPRINTABLE", prefx, src->key);
asprintf(output, "%sOPAL_VALUE: Data type: UNKNOWN\tKey: %s\tScope:%s\tValue: UNPRINTABLE",
prefx, src->key, scope);
break;
}
free(prefx);

View File

@ -90,10 +90,30 @@ typedef struct {
#define OPAL_VALUE2_GREATER -1
#define OPAL_EQUAL 0
/* define a flag to indicate the scope of data being
* stored in the database. The following options are supported:
*
* PEER: data to be shared with our peers
* NON_PEER: data to be shared only with non-peer
* processes (i.e., processes from other jobs)
* GLOBAL: data to be shared with all processes
* INTERNAL: data is to be internally stored in this app
* ALL: any of the above
*/
typedef uint8_t opal_scope_t;
#define OPAL_SCOPE_UNDEF 0x00
#define OPAL_SCOPE_PEER 0x01
#define OPAL_SCOPE_NON_PEER 0x02
#define OPAL_SCOPE_GLOBAL 0x03
#define OPAL_SCOPE_INTERNAL 0x80
#define OPAL_SCOPE_ALL 0xff
#define OPAL_DATA_SCOPE_T OPAL_UINT8
/* Data value object */
typedef struct {
opal_list_item_t super; /* required for this to be on lists */
char *key; /* key string */
opal_scope_t scope;
opal_data_type_t type; /* the type of value stored */
union {
uint8_t byte;

View File

@ -890,6 +890,10 @@ int opal_dss_unpack_value(opal_buffer_t *buffer, void *dest,
return ret;
}
m=1;
if (OPAL_SUCCESS != (ret = opal_dss_unpack_data_type(buffer, &ptr[i]->scope, &m, OPAL_DATA_SCOPE_T))) {
return ret;
}
m=1;
if (OPAL_SUCCESS != (ret = opal_dss_unpack_data_type(buffer, &ptr[i]->type, &m, OPAL_DATA_TYPE))) {
return ret;
}

View File

@ -41,18 +41,20 @@ typedef struct {
OBJ_CLASS_DECLARATION(opal_db_active_module_t);
typedef struct {
opal_identifier_t my_id;
bool id_set;
opal_list_t store_order;
opal_list_t fetch_order;
} opal_db_base_t;
OPAL_DECLSPEC extern opal_db_base_t opal_db_base;
OPAL_DECLSPEC void opal_db_base_set_id(const opal_identifier_t *proc);
OPAL_DECLSPEC int opal_db_base_store(const opal_identifier_t *proc,
opal_db_locality_t locality,
opal_scope_t scope,
const char *key, const void *object,
opal_data_type_t type);
OPAL_DECLSPEC int opal_db_base_store_pointer(const opal_identifier_t *proc,
opal_db_locality_t locality,
opal_value_t *kv);
OPAL_DECLSPEC int opal_db_base_fetch(const opal_identifier_t *proc,
const char *key, void **data,
@ -61,14 +63,13 @@ OPAL_DECLSPEC int opal_db_base_fetch_pointer(const opal_identifier_t *proc,
const char *key,
void **data, opal_data_type_t type);
OPAL_DECLSPEC int opal_db_base_fetch_multiple(const opal_identifier_t *proc,
opal_scope_t scope,
const char *key,
opal_list_t *kvs);
OPAL_DECLSPEC int opal_db_base_remove_data(const opal_identifier_t *proc,
const char *key);
OPAL_DECLSPEC int opal_db_base_add_log(const char *table,
const opal_value_t *kvs, int nkvs);
OPAL_DECLSPEC void opal_db_base_commit(const opal_identifier_t *proc);
END_C_DECLS

View File

@ -1,5 +1,6 @@
/*
* Copyright (c) 2012-2013 Los Alamos National Security, Inc. All rights reserved.
* Copyright (c) 2013 Intel Inc. All rights reserved
* $COPYRIGHT$
*
* Additional copyrights may follow
@ -21,8 +22,15 @@
#include "opal/mca/db/base/base.h"
void opal_db_base_set_id(const opal_identifier_t *proc)
{
/* to protect alignment, copy the data across */
memcpy(&opal_db_base.my_id, proc, sizeof(opal_identifier_t));
opal_db_base.id_set = true;
}
int opal_db_base_store(const opal_identifier_t *proc,
opal_db_locality_t locality,
opal_scope_t scope,
const char *key, const void *object,
opal_data_type_t type)
{
@ -30,13 +38,17 @@ int opal_db_base_store(const opal_identifier_t *proc,
opal_db_active_module_t *mod;
int rc;
if (!opal_db_base.id_set) {
return OPAL_ERR_FATAL;
}
/* cycle thru the active modules until one agrees to perform the op */
did_op = false;
OPAL_LIST_FOREACH(mod, &opal_db_base.store_order, opal_db_active_module_t) {
if (NULL == mod->module->store) {
continue;
}
if (OPAL_SUCCESS == (rc = mod->module->store(proc, locality, key, object, type))) {
if (OPAL_SUCCESS == (rc = mod->module->store(proc, scope, key, object, type))) {
did_op = true;
break;
}
@ -58,20 +70,23 @@ int opal_db_base_store(const opal_identifier_t *proc,
}
int opal_db_base_store_pointer(const opal_identifier_t *proc,
opal_db_locality_t locality,
opal_value_t *kv)
{
bool did_op;
opal_db_active_module_t *mod;
int rc;
if (!opal_db_base.id_set) {
return OPAL_ERR_FATAL;
}
/* cycle thru the active modules until one agrees to perform the op */
did_op = false;
OPAL_LIST_FOREACH(mod, &opal_db_base.store_order, opal_db_active_module_t) {
if (NULL == mod->module->store_pointer) {
continue;
}
if (OPAL_SUCCESS == (rc = mod->module->store_pointer(proc, locality, kv))) {
if (OPAL_SUCCESS == (rc = mod->module->store_pointer(proc, kv))) {
did_op = true;
break;
}
@ -112,23 +127,33 @@ int opal_db_base_fetch(const opal_identifier_t *proc,
{
bool did_op;
opal_db_active_module_t *mod;
int rc;
int rc, i;
/* cycle thru the actiove modules until one agrees to perform the op */
if (!opal_db_base.id_set) {
return OPAL_ERR_FATAL;
}
/* cycle thru the active modules until one agrees to perform the op */
did_op = false;
OPAL_LIST_FOREACH(mod, &opal_db_base.fetch_order, opal_db_active_module_t) {
if (NULL == mod->module->fetch) {
continue;
}
if (OPAL_SUCCESS == (rc = mod->module->fetch(proc, key, data, type))) {
did_op = true;
break;
}
/* modules return "next option" if they didn't perform
* the operation - anything else is a true error.
*/
if (OPAL_ERR_TAKE_NEXT_OPTION != rc) {
return rc;
/* we cycle thru the list of modules twice - this allows us to check
* a local store first, then attempt to obtain the data from an
* external store that puts it in the local store
*/
for(i=0; i < 2 && !did_op; i++) {
OPAL_LIST_FOREACH(mod, &opal_db_base.fetch_order, opal_db_active_module_t) {
if (NULL == mod->module->fetch) {
continue;
}
if (OPAL_SUCCESS == (rc = mod->module->fetch(proc, key, data, type))) {
did_op = true;
break;
}
/* modules return "next option" if they didn't perform
* the operation - anything else is a true error.
*/
if (OPAL_ERR_TAKE_NEXT_OPTION != rc) {
return rc;
}
}
}
@ -145,23 +170,33 @@ int opal_db_base_fetch_pointer(const opal_identifier_t *proc,
{
bool did_op;
opal_db_active_module_t *mod;
int rc;
int rc, i;
/* cycle thru the actiove modules until one agrees to perform the op */
if (!opal_db_base.id_set) {
return OPAL_ERR_FATAL;
}
/* cycle thru the active modules until one agrees to perform the op */
did_op = false;
OPAL_LIST_FOREACH(mod, &opal_db_base.fetch_order, opal_db_active_module_t) {
if (NULL == mod->module->fetch_pointer) {
continue;
}
if (OPAL_SUCCESS == (rc = mod->module->fetch_pointer(proc, key, data, type))) {
did_op = true;
break;
}
/* modules return "next option" if they didn't perform
* the operation - anything else is a true error.
*/
if (OPAL_ERR_TAKE_NEXT_OPTION != rc) {
return rc;
/* we cycle thru the list of modules twice - this allows us to check
* a local store first, then attempt to obtain the data from an
* external store that puts it in the local store
*/
for(i=0; i < 2 && !did_op; i++) {
OPAL_LIST_FOREACH(mod, &opal_db_base.fetch_order, opal_db_active_module_t) {
if (NULL == mod->module->fetch_pointer) {
continue;
}
if (OPAL_SUCCESS == (rc = mod->module->fetch_pointer(proc, key, data, type))) {
did_op = true;
break;
}
/* modules return "next option" if they didn't perform
* the operation - anything else is a true error.
*/
if (OPAL_ERR_TAKE_NEXT_OPTION != rc) {
return rc;
}
}
}
@ -173,6 +208,7 @@ int opal_db_base_fetch_pointer(const opal_identifier_t *proc,
}
int opal_db_base_fetch_multiple(const opal_identifier_t *proc,
opal_scope_t scope,
const char *key,
opal_list_t *kvs)
{
@ -180,13 +216,17 @@ int opal_db_base_fetch_multiple(const opal_identifier_t *proc,
opal_db_active_module_t *mod;
int rc;
/* cycle thru the actiove modules until one agrees to perform the op */
if (!opal_db_base.id_set) {
return OPAL_ERR_FATAL;
}
/* cycle thru the active modules until one agrees to perform the op */
did_op = false;
OPAL_LIST_FOREACH(mod, &opal_db_base.fetch_order, opal_db_active_module_t) {
if (NULL == mod->module->fetch_multiple) {
continue;
}
if (OPAL_SUCCESS == (rc = mod->module->fetch_multiple(proc, key, kvs))) {
if (OPAL_SUCCESS == (rc = mod->module->fetch_multiple(proc, scope, key, kvs))) {
did_op = true;
break;
}

View File

@ -31,6 +31,7 @@
opal_db_base_module_t opal_db = {
NULL,
NULL,
opal_db_base_set_id,
opal_db_base_store,
opal_db_base_store_pointer,
opal_db_base_commit,
@ -53,6 +54,8 @@ static int opal_db_base_close(void)
static int opal_db_base_open(mca_base_open_flag_t flags)
{
opal_db_base.my_id = 0;
opal_db_base.id_set = false;
OBJ_CONSTRUCT(&opal_db_base.fetch_order, opal_list_t);
OBJ_CONSTRUCT(&opal_db_base.store_order, opal_list_t);

View File

@ -35,21 +35,6 @@
BEGIN_C_DECLS
/* define a flag to indicate the scope of data being
* stored in the database. Three options are supported:
*
* GLOBAL: data is to be published such that any proc
* in the job can access it
* LOCAL: data is to be published such that any proc
* on the same node can access it
* INTERNAL: data is to be stored in this app only
*/
typedef enum {
OPAL_DB_GLOBAL,
OPAL_DB_LOCAL,
OPAL_DB_INTERNAL
} opal_db_locality_t;
/*
* Initialize the module
*/
@ -61,26 +46,51 @@ typedef int (*opal_db_base_module_init_fn_t)(void);
typedef void (*opal_db_base_module_finalize_fn_t)(void);
/*
* Store a copy of data in the database - overwrites if already present. The data is
* copied into the database and therefore does not need to be preserved by
* the caller.
* Set local identifier - pass in an opal_identifier_t value
* that identifies this process. Used to determine whether or
* not to publish values outside the process. Values stored
* for other processes are never published as it is assumed
* that each process is responsible for determining its own
* need to publish a given piece of data
*/
typedef void (*opal_db_base_module_set_id_fn_t)(const opal_identifier_t *proc);
/*
* Store a copy of data in the database - overwrites if already present. The
* data is copied into the database and therefore does not need to be preserved
* by the caller. The scope of the data determines where it is stored:
*
* - if the proc id is NOT my own, or the scope is INTERNAL, then the
* data is stored in my own internal storage system. Data for procs
* other than myself is NEVER published to the outside world
*
* - if the proc id is my own, and the scope is LOCAL, then the data
* is both stored internally AND pushed to the outside world. If the
* external API supports node-local operations, then the data will
* only be pushed to procs that are on the same node as ourselves.
* Otherwise, the data will be published GLOBAL.
*
* - if the proc id is my own, and the scope is GLOBAL, then the data
* is both stored internally AND pushed to the outside world.
&
*/
typedef int (*opal_db_base_module_store_fn_t)(const opal_identifier_t *proc,
opal_db_locality_t locality,
opal_scope_t scope,
const char *key, const void *data,
opal_data_type_t type);
/*
* Store a pointer to data in the database - data must be retained by the user.
* This allows users to share data across the code base without consuming
* additional memory, but while retaining local access
* additional memory, but while retaining local access. Scope rules are
* as outlined above
*/
typedef int (*opal_db_base_module_store_pointer_fn_t)(const opal_identifier_t *proc,
opal_db_locality_t locality,
opal_value_t *kv);
/*
* Commit data to the database
* Commit data to the database - used to generate a commit of data
* to an external key-value store such as PMI
*/
typedef void (*opal_db_base_module_commit_fn_t)(const opal_identifier_t *proc);
@ -110,9 +120,16 @@ typedef int (*opal_db_base_module_fetch_pointer_fn_t)(const opal_identifier_t *p
* Retrieve multiple data elements
*
* Retrieve data for the given proc associated with the specified key. Wildcards
* are supported here as well. Caller is responsible for releasing the objects on the list.
* are supported here as well (key==NULL implies return all key-value pairs). Caller
* is responsible for releasing the objects on the list. Scope rules are as described above.
* A NULL identifer parameter indicates that data for all procs is to be returned.
* The scope of the data is matched against the scope of the data when
* stored. Note that a call to fetch data with a GLOBAL scope will return data
* that was stored as PEER or NON_PEER, but not data stored as INTERNAL. A scope
* of ALL will return data stored under any scope.
*/
typedef int (*opal_db_base_module_fetch_multiple_fn_t)(const opal_identifier_t *proc,
opal_scope_t scope,
const char *key,
opal_list_t *kvs);
@ -141,6 +158,7 @@ typedef int (*opal_db_base_module_add_log_fn_t)(const char *table, const opal_va
struct opal_db_base_module_1_0_0_t {
opal_db_base_module_init_fn_t init;
opal_db_base_module_finalize_fn_t finalize;
opal_db_base_module_set_id_fn_t set_id;
opal_db_base_module_store_fn_t store;
opal_db_base_module_store_pointer_fn_t store_pointer;
opal_db_base_module_commit_fn_t commit;

View File

@ -23,6 +23,10 @@ BEGIN_C_DECLS
typedef uint64_t opal_identifier_t;
/* some OPAL-appropriate key definitions */
#define OPAL_DB_LOCALITY "opal.locality"
#define OPAL_DB_CPUSET "opal.cpuset"
END_C_DECLS
#endif

View File

@ -33,18 +33,19 @@
static int init(void);
static void finalize(void);
static int store(const opal_identifier_t *proc,
opal_db_locality_t locality,
opal_scope_t scope,
const char *key, const void *object,
opal_data_type_t type);
static int store_pointer(const opal_identifier_t *proc,
opal_db_locality_t locality,
opal_value_t *kv);
static int fetch(const opal_identifier_t *proc,
const char *key, void **data, opal_data_type_t type);
const char *key, void **data,
opal_data_type_t type);
static int fetch_pointer(const opal_identifier_t *proc,
const char *key,
void **data, opal_data_type_t type);
static int fetch_multiple(const opal_identifier_t *proc,
opal_scope_t scope,
const char *key,
opal_list_t *kvs);
static int remove_data(const opal_identifier_t *proc, const char *key);
@ -52,6 +53,7 @@ static int remove_data(const opal_identifier_t *proc, const char *key);
opal_db_base_module_t opal_db_hash_module = {
init,
finalize,
opal_db_base_set_id,
store,
store_pointer,
NULL,
@ -116,7 +118,7 @@ static void finalize(void)
* container.
*/
static opal_value_t* lookup_keyval(proc_data_t *proc_data,
const char *key)
const char *key)
{
opal_value_t *kv = NULL;
for (kv = (opal_value_t *) opal_list_get_first(&proc_data->data);
@ -132,7 +134,7 @@ static opal_value_t* lookup_keyval(proc_data_t *proc_data,
/**
* Find proc_data_t container associated with given
* Find proc_data_t container associated with given
* opal_identifier_t.
*/
static proc_data_t* lookup_opal_proc(opal_hash_table_t *jtable, opal_identifier_t id)
@ -154,7 +156,7 @@ static proc_data_t* lookup_opal_proc(opal_hash_table_t *jtable, opal_identifier_
}
static int store(const opal_identifier_t *uid,
opal_db_locality_t locality,
opal_scope_t scope,
const char *key, const void *data,
opal_data_type_t type)
{
@ -163,6 +165,11 @@ static int store(const opal_identifier_t *uid,
opal_byte_object_t *boptr;
opal_identifier_t id;
/* data must have an assigned scope */
if (OPAL_SCOPE_UNDEF == scope) {
return OPAL_ERR_BAD_PARAM;
}
/* to protect alignment, copy the data across */
memcpy(&id, uid, sizeof(opal_identifier_t));
@ -170,8 +177,8 @@ static int store(const opal_identifier_t *uid,
* if this fell to us, we store it
*/
opal_output_verbose(1, opal_db_base_framework.framework_output,
"db:hash:store storing data for proc %" PRIu64 " at locality %d",
id, (int)locality);
"db:hash:store storing data for proc %" PRIu64 " for scope %d",
id, (int)scope);
/* lookup the proc data object for this proc */
if (NULL == (proc_data = lookup_opal_proc(&hash_data, id))) {
@ -197,6 +204,7 @@ static int store(const opal_identifier_t *uid,
}
kv = OBJ_NEW(opal_value_t);
kv->key = strdup(key);
kv->scope = scope;
opal_list_append(&proc_data->data, &kv->super);
/* the type could come in as an OPAL one (e.g., OPAL_VPID). Since
@ -265,13 +273,17 @@ static int store(const opal_identifier_t *uid,
}
static int store_pointer(const opal_identifier_t *uid,
opal_db_locality_t locality,
opal_value_t *kv)
{
proc_data_t *proc_data;
opal_value_t *k2;
opal_identifier_t id;
/* data must have an assigned scope */
if (OPAL_SCOPE_UNDEF == kv->scope) {
return OPAL_ERR_BAD_PARAM;
}
/* to protect alignment, copy the data across */
memcpy(&id, uid, sizeof(opal_identifier_t));
@ -279,8 +291,8 @@ static int store_pointer(const opal_identifier_t *uid,
* if this fell to us, we store it
*/
opal_output_verbose(1, opal_db_base_framework.framework_output,
"db:hash:store storing data for proc %" PRIu64 " at locality %d",
id, (int)locality);
"db:hash:store storing data for proc %" PRIu64 " for scope %d",
id, (int)kv->scope);
/* lookup the proc data object for this proc */
if (NULL == (proc_data = lookup_opal_proc(&hash_data, id))) {
@ -308,7 +320,8 @@ static int store_pointer(const opal_identifier_t *uid,
}
static int fetch(const opal_identifier_t *uid,
const char *key, void **data, opal_data_type_t type)
const char *key, void **data,
opal_data_type_t type)
{
proc_data_t *proc_data;
opal_value_t *kv;
@ -337,12 +350,8 @@ static int fetch(const opal_identifier_t *uid,
/* find the value */
if (NULL == (kv = lookup_keyval(proc_data, key))) {
/* if the proc object was found, then all data
* for that proc has been stored - if we didn't
* find it, then report so. Otherwise, we can
* enter an infinite loop of looking for it
*/
return OPAL_ERR_NOT_FOUND;
/* let them look globally for it */
return OPAL_ERR_TAKE_NEXT_OPTION;
}
/* do the copy and check the type */
@ -433,12 +442,8 @@ static int fetch_pointer(const opal_identifier_t *uid,
/* find the value */
if (NULL == (kv = lookup_keyval(proc_data, key))) {
/* if the proc object was found, then all data
* for that proc has been stored - if we didn't
* find it, then report so. Otherwise, we can
* enter an infinite loop of looking for it
*/
return OPAL_ERR_NOT_FOUND;
/* let them look globally for it */
return OPAL_ERR_TAKE_NEXT_OPTION;
}
switch (type) {
@ -487,6 +492,7 @@ static int fetch_pointer(const opal_identifier_t *uid,
}
static int fetch_multiple(const opal_identifier_t *uid,
opal_scope_t scope,
const char *key,
opal_list_t *kvs)
{
@ -515,6 +521,10 @@ static int fetch_multiple(const opal_identifier_t *uid,
for (kv = (opal_value_t*) opal_list_get_first(&proc_data->data);
kv != (opal_value_t*) opal_list_get_end(&proc_data->data);
kv = (opal_value_t*) opal_list_get_next(kv)) {
/* check for a matching scope */
if (!(scope & kv->scope)) {
continue;
}
if (OPAL_SUCCESS != (rc = opal_dss.copy((void**)&kvnew, kv, OPAL_VALUE))) {
OPAL_ERROR_LOG(rc);
return rc;
@ -535,6 +545,10 @@ static int fetch_multiple(const opal_identifier_t *uid,
for (kv = (opal_value_t*) opal_list_get_first(&proc_data->data);
kv != (opal_value_t*) opal_list_get_end(&proc_data->data);
kv = (opal_value_t*) opal_list_get_next(kv)) {
/* check for a matching scope */
if (!(scope & kv->scope)) {
continue;
}
if ((0 < len && 0 == strncmp(srchkey, kv->key, len)) ||
(0 == len && 0 == strcmp(key, kv->key))) {
if (OPAL_SUCCESS != (rc = opal_dss.copy((void**)&kvnew, kv, OPAL_VALUE))) {

View File

@ -40,11 +40,10 @@
static int init(void);
static void finalize(void);
static int store(const opal_identifier_t *id,
opal_db_locality_t locality,
opal_scope_t scope,
const char *key, const void *object,
opal_data_type_t type);
static int store_pointer(const opal_identifier_t *proc,
opal_db_locality_t locality,
opal_value_t *kv);
static void commit(const opal_identifier_t *proc);
static int fetch(const opal_identifier_t *proc,
@ -53,6 +52,7 @@ static int fetch_pointer(const opal_identifier_t *proc,
const char *key,
void **data, opal_data_type_t type);
static int fetch_multiple(const opal_identifier_t *proc,
opal_scope_t scope,
const char *key,
opal_list_t *kvs);
static int remove_data(const opal_identifier_t *proc, const char *key);
@ -60,6 +60,7 @@ static int remove_data(const opal_identifier_t *proc, const char *key);
opal_db_base_module_t opal_db_pmi_module = {
init,
finalize,
opal_db_base_set_id,
store,
store_pointer,
commit,
@ -326,21 +327,20 @@ static int pmi_get_packed (const opal_identifier_t *uid, char **packed_data, siz
return OPAL_SUCCESS;
}
static bool cache_keys_locally (const opal_identifier_t *uid, const char *key)
static void cache_keys_locally(const opal_identifier_t *uid)
{
char *tmp, *tmp2, *tmp3, *tmp_val;
opal_data_type_t stored_type;
size_t len, offset;
int rc, size;
bool found;
OPAL_OUTPUT_VERBOSE((1, opal_db_base_framework.framework_output,
"db:pmi:fetch get key %s for proc %" PRIu64 " in KVS %s",
key, *uid, pmi_kvs_name));
"db:pmi:fetch get all keys for proc %" PRIu64 " in KVS %s",
*uid, pmi_kvs_name));
rc = pmi_get_packed (uid, &tmp_val, &len);
if (OPAL_SUCCESS != rc) {
return rc;
return;
}
/* search for this key in the decoded data */
@ -365,28 +365,22 @@ static bool cache_keys_locally (const opal_identifier_t *uid, const char *key)