2012-10-25 17:15:17 +00:00
|
|
|
/*
|
Per the meeting on moving the BTLs to OPAL, move the ORTE database "db" framework to OPAL so the relocated BTLs can access it. Because the data is indexed by process, this requires that we define a new "opal_identifier_t" that corresponds to the orte_process_name_t struct. In order to support multiple run-times, this is defined in opal/mca/db/db_types.h as a uint64_t without identifying the meaning of any part of that data.
A few changes were required to support this move:
1. the PMI component used to identify rte-related data (e.g., host name, bind level) and package them as a unit to reduce the number of PMI keys. This code was moved up to the ORTE layer as the OPAL layer has no understanding of these concepts. In addition, the component locally stored data based on process jobid/vpid - this could no longer be supported (see below for the solution).
2. the hash component was updated to use the new opal_identifier_t instead of orte_process_name_t as its index for storing data in the hash tables. Previously, we did a hash on the vpid and stored the data in a 32-bit hash table. In the revised system, we don't see a separate "vpid" field - we only have a 64-bit opaque value. The orte_process_name_t hash turned out to do nothing useful, so we now store the data in a 64-bit hash table. Preliminary tests didn't show any identifiable change in behavior or performance, but we'll have to see if a move back to the 32-bit table is required at some later time.
3. the db framework was a "select one" system. However, since the PMI component could no longer use its internal storage system, the framework has now been changed to a "select many" mode of operation. This allows the hash component to handle all internal storage, while the PMI component only handles pushing/pulling things from the PMI system. This was something we had planned for some time - when fetching data, we first check internal storage to see if we already have it, and then automatically go to the global system to look for it if we don't. Accordingly, the framework was provided with a custom query function used during "select" that lets you seperately specify the "store" and "fetch" ordering.
4. the ORTE grpcomm and ess/pmi components, and the nidmap code, were updated to work with the new db framework and to specify internal/global storage options.
No changes were made to the MPI layer, except for modifying the ORTE component of the OMPI/rte framework to support the new db framework.
This commit was SVN r28112.
2013-02-26 17:50:04 +00:00
|
|
|
* Copyright (c) 2012-2013 Los Alamos National Security, LLC.
|
2012-10-25 17:15:17 +00:00
|
|
|
* All rights reserved.
|
|
|
|
* $COPYRIGHT$
|
|
|
|
*
|
|
|
|
* Additional copyrights may follow
|
|
|
|
*
|
|
|
|
* $HEADER$
|
|
|
|
*/
|
|
|
|
|
|
|
|
#include "orte_config.h"
|
|
|
|
|
|
|
|
#include <sys/types.h>
|
|
|
|
#ifdef HAVE_UNISTD_H
|
|
|
|
#include <unistd.h>
|
|
|
|
#endif /* HAVE_UNISTD_H */
|
|
|
|
#ifdef HAVE_STRING_H
|
|
|
|
#include <string.h>
|
|
|
|
#endif
|
|
|
|
#ifdef HAVE_FCNTL_H
|
|
|
|
#include <fcntl.h>
|
|
|
|
#endif
|
2012-10-25 22:23:08 +00:00
|
|
|
#include <sys/stat.h>
|
2012-10-25 17:15:17 +00:00
|
|
|
|
|
|
|
#include "opal/util/if.h"
|
|
|
|
#include "opal/util/output.h"
|
|
|
|
#include "opal/util/uri.h"
|
|
|
|
#include "opal/dss/dss.h"
|
Per the meeting on moving the BTLs to OPAL, move the ORTE database "db" framework to OPAL so the relocated BTLs can access it. Because the data is indexed by process, this requires that we define a new "opal_identifier_t" that corresponds to the orte_process_name_t struct. In order to support multiple run-times, this is defined in opal/mca/db/db_types.h as a uint64_t without identifying the meaning of any part of that data.
A few changes were required to support this move:
1. the PMI component used to identify rte-related data (e.g., host name, bind level) and package them as a unit to reduce the number of PMI keys. This code was moved up to the ORTE layer as the OPAL layer has no understanding of these concepts. In addition, the component locally stored data based on process jobid/vpid - this could no longer be supported (see below for the solution).
2. the hash component was updated to use the new opal_identifier_t instead of orte_process_name_t as its index for storing data in the hash tables. Previously, we did a hash on the vpid and stored the data in a 32-bit hash table. In the revised system, we don't see a separate "vpid" field - we only have a 64-bit opaque value. The orte_process_name_t hash turned out to do nothing useful, so we now store the data in a 64-bit hash table. Preliminary tests didn't show any identifiable change in behavior or performance, but we'll have to see if a move back to the 32-bit table is required at some later time.
3. the db framework was a "select one" system. However, since the PMI component could no longer use its internal storage system, the framework has now been changed to a "select many" mode of operation. This allows the hash component to handle all internal storage, while the PMI component only handles pushing/pulling things from the PMI system. This was something we had planned for some time - when fetching data, we first check internal storage to see if we already have it, and then automatically go to the global system to look for it if we don't. Accordingly, the framework was provided with a custom query function used during "select" that lets you seperately specify the "store" and "fetch" ordering.
4. the ORTE grpcomm and ess/pmi components, and the nidmap code, were updated to work with the new db framework and to specify internal/global storage options.
No changes were made to the MPI layer, except for modifying the ORTE component of the OMPI/rte framework to support the new db framework.
This commit was SVN r28112.
2013-02-26 17:50:04 +00:00
|
|
|
#include "opal/mca/db/db.h"
|
2012-10-25 17:15:17 +00:00
|
|
|
|
|
|
|
#include "orte/util/error_strings.h"
|
|
|
|
#include "orte/util/name_fns.h"
|
|
|
|
#include "orte/util/show_help.h"
|
|
|
|
#include "orte/runtime/orte_globals.h"
|
|
|
|
#include "orte/mca/errmgr/errmgr.h"
|
|
|
|
#include "orte/mca/rml/rml.h"
|
|
|
|
|
|
|
|
#include "orte/mca/dfs/base/base.h"
|
|
|
|
#include "dfs_app.h"
|
|
|
|
|
|
|
|
/*
|
|
|
|
* Module functions: Global
|
|
|
|
*/
|
|
|
|
static int init(void);
|
|
|
|
static int finalize(void);
|
|
|
|
|
|
|
|
static void dfs_open(char *uri,
|
|
|
|
orte_dfs_open_callback_fn_t cbfunc,
|
|
|
|
void *cbdata);
|
2012-10-26 15:49:04 +00:00
|
|
|
static void dfs_close(int fd,
|
|
|
|
orte_dfs_close_callback_fn_t cbfunc,
|
|
|
|
void *cbdata);
|
2012-10-25 22:23:08 +00:00
|
|
|
static void dfs_get_file_size(int fd,
|
|
|
|
orte_dfs_size_callback_fn_t cbfunc,
|
|
|
|
void *cbdata);
|
2012-10-26 15:49:04 +00:00
|
|
|
static void dfs_seek(int fd, long offset, int whence,
|
|
|
|
orte_dfs_seek_callback_fn_t cbfunc,
|
|
|
|
void *cbdata);
|
2012-10-25 17:15:17 +00:00
|
|
|
static void dfs_read(int fd, uint8_t *buffer,
|
|
|
|
long length,
|
|
|
|
orte_dfs_read_callback_fn_t cbfunc,
|
|
|
|
void *cbdata);
|
2012-11-10 14:09:12 +00:00
|
|
|
static void dfs_post_file_map(opal_buffer_t *bo,
|
2012-10-29 23:05:45 +00:00
|
|
|
orte_dfs_post_callback_fn_t cbfunc,
|
|
|
|
void *cbdata);
|
|
|
|
static void dfs_get_file_map(orte_process_name_t *target,
|
|
|
|
orte_dfs_fm_callback_fn_t cbfunc,
|
|
|
|
void *cbdata);
|
|
|
|
static void dfs_load_file_maps(orte_jobid_t jobid,
|
2012-11-10 14:09:12 +00:00
|
|
|
opal_buffer_t *bo,
|
2012-10-29 23:05:45 +00:00
|
|
|
orte_dfs_load_callback_fn_t cbfunc,
|
|
|
|
void *cbdata);
|
|
|
|
static void dfs_purge_file_maps(orte_jobid_t jobid,
|
|
|
|
orte_dfs_purge_callback_fn_t cbfunc,
|
|
|
|
void *cbdata);
|
2012-10-25 17:15:17 +00:00
|
|
|
|
|
|
|
/******************
|
|
|
|
* APP module
|
|
|
|
******************/
|
|
|
|
orte_dfs_base_module_t orte_dfs_app_module = {
|
|
|
|
init,
|
|
|
|
finalize,
|
|
|
|
dfs_open,
|
|
|
|
dfs_close,
|
2012-10-25 22:23:08 +00:00
|
|
|
dfs_get_file_size,
|
2012-10-25 17:15:17 +00:00
|
|
|
dfs_seek,
|
2012-10-29 23:05:45 +00:00
|
|
|
dfs_read,
|
|
|
|
dfs_post_file_map,
|
|
|
|
dfs_get_file_map,
|
|
|
|
dfs_load_file_maps,
|
|
|
|
dfs_purge_file_maps
|
2012-10-25 17:15:17 +00:00
|
|
|
};
|
|
|
|
|
|
|
|
static opal_list_t requests, active_files;
|
|
|
|
static int local_fd = 0;
|
|
|
|
static uint64_t req_id = 0;
|
|
|
|
static void recv_dfs(int status, orte_process_name_t* sender,
|
|
|
|
opal_buffer_t* buffer, orte_rml_tag_t tag,
|
|
|
|
void* cbdata);
|
|
|
|
|
|
|
|
static int init(void)
|
|
|
|
{
|
|
|
|
OBJ_CONSTRUCT(&requests, opal_list_t);
|
|
|
|
OBJ_CONSTRUCT(&active_files, opal_list_t);
|
As per the RFC, bring in the ORTE async progress code and the rewrite of OOB:
*** THIS RFC INCLUDES A MINOR CHANGE TO THE MPI-RTE INTERFACE ***
Note: during the course of this work, it was necessary to completely separate the MPI and RTE progress engines. There were multiple places in the MPI layer where ORTE_WAIT_FOR_COMPLETION was being used. A new OMPI_WAIT_FOR_COMPLETION macro was created (defined in ompi/mca/rte/rte.h) that simply cycles across opal_progress until the provided flag becomes false. Places where the MPI layer blocked waiting for RTE to complete an event have been modified to use this macro.
***************************************************************************************
I am reissuing this RFC because of the time that has passed since its original release. Since its initial release and review, I have debugged it further to ensure it fully supports tests like loop_spawn. It therefore seems ready for merge back to the trunk. Given its prior review, I have set the timeout for one week.
The code is in https://bitbucket.org/rhc/ompi-oob2
WHAT: Rewrite of ORTE OOB
WHY: Support asynchronous progress and a host of other features
WHEN: Wed, August 21
SYNOPSIS:
The current OOB has served us well, but a number of limitations have been identified over the years. Specifically:
* it is only progressed when called via opal_progress, which can lead to hangs or recursive calls into libevent (which is not supported by that code)
* we've had issues when multiple NICs are available as the code doesn't "shift" messages between transports - thus, all nodes had to be available via the same TCP interface.
* the OOB "unloads" incoming opal_buffer_t objects during the transmission, thus preventing use of OBJ_RETAIN in the code when repeatedly sending the same message to multiple recipients
* there is no failover mechanism across NICs - if the selected NIC (or its attached switch) fails, we are forced to abort
* only one transport (i.e., component) can be "active"
The revised OOB resolves these problems:
* async progress is used for all application processes, with the progress thread blocking in the event library
* each available TCP NIC is supported by its own TCP module. The ability to asynchronously progress each module independently is provided, but not enabled by default (a runtime MCA parameter turns it "on")
* multi-address TCP NICs (e.g., a NIC with both an IPv4 and IPv6 address, or with virtual interfaces) are supported - reachability is determined by comparing the contact info for a peer against all addresses within the range covered by the address/mask pairs for the NIC.
* a message that arrives on one TCP NIC is automatically shifted to whatever NIC that is connected to the next "hop" if that peer cannot be reached by the incoming NIC. If no TCP module will reach the peer, then the OOB attempts to send the message via all other available components - if none can reach the peer, then an "error" is reported back to the RML, which then calls the errmgr for instructions.
* opal_buffer_t now conforms to standard object rules re OBJ_RETAIN as we no longer "unload" the incoming object
* NIC failure is reported to the TCP component, which then tries to resend the message across any other available TCP NIC. If that doesn't work, then the message is given back to the OOB base to try using other components. If all that fails, then the error is reported to the RML, which reports to the errmgr for instructions
* obviously from the above, multiple OOB components (e.g., TCP and UD) can be active in parallel
* the matching code has been moved to the RML (and out of the OOB/TCP component) so it is independent of transport
* routing is done by the individual OOB modules (as opposed to the RML). Thus, both routed and non-routed transports can simultaneously be active
* all blocking send/recv APIs have been removed. Everything operates asynchronously.
KNOWN LIMITATIONS:
* although provision is made for component failover as described above, the code for doing so has not been fully implemented yet. At the moment, if all connections for a given peer fail, the errmgr is notified of a "lost connection", which by default results in termination of the job if it was a lifeline
* the IPv6 code is present and compiles, but is not complete. Since the current IPv6 support in the OOB doesn't work anyway, I don't consider this a blocker
* routing is performed at the individual module level, yet the active routed component is selected on a global basis. We probably should update that to reflect that different transports may need/choose to route in different ways
* obviously, not every error path has been tested nor necessarily covered
* determining abnormal termination is more challenging than in the old code as we now potentially have multiple ways of connecting to a process. Ideally, we would declare "connection failed" when *all* transports can no longer reach the process, but that requires some additional (possibly complex) code. For now, the code replicates the old behavior only somewhat modified - i.e., if a module sees its connection fail, it checks to see if it is a lifeline. If so, it notifies the errmgr that the lifeline is lost - otherwise, it notifies the errmgr that a non-lifeline connection was lost.
* reachability is determined solely on the basis of a shared subnet address/mask - more sophisticated algorithms (e.g., the one used in the tcp btl) are required to handle routing via gateways
* the RML needs to assign sequence numbers to each message on a per-peer basis. The receiving RML will then deliver messages in order, thus preventing out-of-order messaging in the case where messages travel across different transports or a message needs to be redirected/resent due to failure of a NIC
This commit was SVN r29058.
2013-08-22 16:37:40 +00:00
|
|
|
orte_rml.recv_buffer_nb(ORTE_NAME_WILDCARD,
|
|
|
|
ORTE_RML_TAG_DFS_DATA,
|
|
|
|
ORTE_RML_PERSISTENT,
|
|
|
|
recv_dfs,
|
|
|
|
NULL);
|
|
|
|
return ORTE_SUCCESS;
|
2012-10-25 17:15:17 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
static int finalize(void)
|
|
|
|
{
|
|
|
|
opal_list_item_t *item;
|
|
|
|
|
2012-10-25 22:23:08 +00:00
|
|
|
orte_rml.recv_cancel(ORTE_NAME_WILDCARD, ORTE_RML_TAG_DFS_DATA);
|
2012-10-25 17:15:17 +00:00
|
|
|
while (NULL != (item = opal_list_remove_first(&requests))) {
|
|
|
|
OBJ_RELEASE(item);
|
|
|
|
}
|
|
|
|
OBJ_DESTRUCT(&requests);
|
|
|
|
while (NULL != (item = opal_list_remove_first(&active_files))) {
|
|
|
|
OBJ_RELEASE(item);
|
|
|
|
}
|
|
|
|
OBJ_DESTRUCT(&active_files);
|
|
|
|
return ORTE_SUCCESS;
|
|
|
|
}
|
|
|
|
|
|
|
|
/* receives take place in an event, so we are free to process
|
|
|
|
* the request list without fear of getting things out-of-order
|
|
|
|
*/
|
|
|
|
static void recv_dfs(int status, orte_process_name_t* sender,
|
|
|
|
opal_buffer_t* buffer, orte_rml_tag_t tag,
|
|
|
|
void* cbdata)
|
|
|
|
{
|
|
|
|
orte_dfs_cmd_t cmd;
|
|
|
|
int32_t cnt;
|
|
|
|
orte_dfs_request_t *dfs, *dptr;
|
|
|
|
opal_list_item_t *item;
|
|
|
|
int remote_fd, rc;
|
|
|
|
int64_t i64;
|
|
|
|
uint64_t rid;
|
|
|
|
orte_dfs_tracker_t *trk;
|
|
|
|
|
|
|
|
/* unpack the command this message is responding to */
|
|
|
|
cnt = 1;
|
|
|
|
if (OPAL_SUCCESS != (rc = opal_dss.unpack(buffer, &cmd, &cnt, ORTE_DFS_CMD_T))) {
|
|
|
|
ORTE_ERROR_LOG(rc);
|
|
|
|
return;
|
|
|
|
}
|
|
|
|
|
2013-03-27 21:14:43 +00:00
|
|
|
opal_output_verbose(1, orte_dfs_base_framework.framework_output,
|
2012-10-25 17:15:17 +00:00
|
|
|
"%s recvd cmd %d from sender %s",
|
|
|
|
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), (int)cmd,
|
|
|
|
ORTE_NAME_PRINT(sender));
|
|
|
|
|
|
|
|
switch (cmd) {
|
|
|
|
case ORTE_DFS_OPEN_CMD:
|
|
|
|
/* unpack the request id */
|
|
|
|
cnt = 1;
|
|
|
|
if (OPAL_SUCCESS != (rc = opal_dss.unpack(buffer, &rid, &cnt, OPAL_UINT64))) {
|
|
|
|
ORTE_ERROR_LOG(rc);
|
|
|
|
return;
|
|
|
|
}
|
|
|
|
/* unpack the remote fd */
|
|
|
|
cnt = 1;
|
|
|
|
if (OPAL_SUCCESS != (rc = opal_dss.unpack(buffer, &remote_fd, &cnt, OPAL_INT))) {
|
|
|
|
ORTE_ERROR_LOG(rc);
|
|
|
|
return;
|
|
|
|
}
|
|
|
|
/* search our list of requests to find the matching one */
|
|
|
|
dfs = NULL;
|
|
|
|
for (item = opal_list_get_first(&requests);
|
|
|
|
item != opal_list_get_end(&requests);
|
|
|
|
item = opal_list_get_next(item)) {
|
|
|
|
dptr = (orte_dfs_request_t*)item;
|
|
|
|
if (dptr->id == rid) {
|
|
|
|
/* as the request has been fulfilled, remove it */
|
|
|
|
opal_list_remove_item(&requests, item);
|
|
|
|
dfs = dptr;
|
|
|
|
break;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
if (NULL == dfs) {
|
2013-03-27 21:14:43 +00:00
|
|
|
opal_output_verbose(1, orte_dfs_base_framework.framework_output,
|
2012-10-25 17:15:17 +00:00
|
|
|
"%s recvd open file - no corresponding request found for local fd %d",
|
|
|
|
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), local_fd);
|
|
|
|
ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND);
|
|
|
|
return;
|
|
|
|
}
|
|
|
|
|
|
|
|
/* if the remote_fd < 0, then we had an error, so return
|
|
|
|
* the error value to the caller
|
|
|
|
*/
|
|
|
|
if (remote_fd < 0) {
|
2013-03-27 21:14:43 +00:00
|
|
|
opal_output_verbose(1, orte_dfs_base_framework.framework_output,
|
2012-10-25 17:15:17 +00:00
|
|
|
"%s recvd open file response error file %s [error: %d]",
|
|
|
|
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
|
|
|
dfs->uri, remote_fd);
|
|
|
|
if (NULL != dfs->open_cbfunc) {
|
|
|
|
dfs->open_cbfunc(remote_fd, dfs->cbdata);
|
|
|
|
}
|
|
|
|
/* release the request */
|
|
|
|
OBJ_RELEASE(dfs);
|
|
|
|
return;
|
|
|
|
}
|
|
|
|
/* otherwise, create a tracker for this file */
|
|
|
|
trk = OBJ_NEW(orte_dfs_tracker_t);
|
|
|
|
trk->requestor.jobid = ORTE_PROC_MY_NAME->jobid;
|
|
|
|
trk->requestor.vpid = ORTE_PROC_MY_NAME->vpid;
|
|
|
|
trk->host_daemon.jobid = sender->jobid;
|
|
|
|
trk->host_daemon.vpid = sender->vpid;
|
2012-11-12 02:54:53 +00:00
|
|
|
trk->uri = strdup(dfs->uri);
|
|
|
|
/* break the uri down into scheme and filename */
|
|
|
|
trk->scheme = opal_uri_get_scheme(dfs->uri);
|
|
|
|
trk->filename = opal_filename_from_uri(dfs->uri, NULL);
|
2012-10-25 17:15:17 +00:00
|
|
|
/* define the local fd */
|
|
|
|
trk->local_fd = local_fd++;
|
|
|
|
/* record the remote file descriptor */
|
|
|
|
trk->remote_fd = remote_fd;
|
|
|
|
/* add it to our list of active files */
|
|
|
|
opal_list_append(&active_files, &trk->super);
|
|
|
|
/* return the local_fd to the caller for
|
|
|
|
* subsequent operations
|
|
|
|
*/
|
2013-03-27 21:14:43 +00:00
|
|
|
opal_output_verbose(1, orte_dfs_base_framework.framework_output,
|
2012-10-25 17:15:17 +00:00
|
|
|
"%s recvd open file completed for file %s [local fd: %d remote fd: %d]",
|
|
|
|
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
|
|
|
dfs->uri, trk->local_fd, remote_fd);
|
|
|
|
if (NULL != dfs->open_cbfunc) {
|
|
|
|
dfs->open_cbfunc(trk->local_fd, dfs->cbdata);
|
|
|
|
}
|
|
|
|
/* release the request */
|
|
|
|
OBJ_RELEASE(dfs);
|
|
|
|
break;
|
|
|
|
|
2012-10-25 22:23:08 +00:00
|
|
|
case ORTE_DFS_SIZE_CMD:
|
|
|
|
/* unpack the request id for this request */
|
|
|
|
cnt = 1;
|
|
|
|
if (OPAL_SUCCESS != (rc = opal_dss.unpack(buffer, &rid, &cnt, OPAL_UINT64))) {
|
|
|
|
ORTE_ERROR_LOG(rc);
|
|
|
|
return;
|
|
|
|
}
|
|
|
|
/* search our list of requests to find the matching one */
|
|
|
|
dfs = NULL;
|
|
|
|
for (item = opal_list_get_first(&requests);
|
|
|
|
item != opal_list_get_end(&requests);
|
|
|
|
item = opal_list_get_next(item)) {
|
|
|
|
dptr = (orte_dfs_request_t*)item;
|
|
|
|
if (dptr->id == rid) {
|
|
|
|
/* request was fulfilled, so remove it */
|
|
|
|
opal_list_remove_item(&requests, item);
|
|
|
|
dfs = dptr;
|
|
|
|
break;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
if (NULL == dfs) {
|
2013-03-27 21:14:43 +00:00
|
|
|
opal_output_verbose(1, orte_dfs_base_framework.framework_output,
|
2012-10-25 22:23:08 +00:00
|
|
|
"%s recvd size - no corresponding request found for local fd %d",
|
|
|
|
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), local_fd);
|
|
|
|
ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND);
|
|
|
|
return;
|
|
|
|
}
|
|
|
|
/* get the size */
|
|
|
|
cnt = 1;
|
|
|
|
if (OPAL_SUCCESS != (rc = opal_dss.unpack(buffer, &i64, &cnt, OPAL_INT64))) {
|
|
|
|
ORTE_ERROR_LOG(rc);
|
|
|
|
OBJ_RELEASE(dfs);
|
|
|
|
return;
|
|
|
|
}
|
2012-10-26 15:49:04 +00:00
|
|
|
/* pass it back to the original caller */
|
2012-10-25 22:23:08 +00:00
|
|
|
if (NULL != dfs->size_cbfunc) {
|
|
|
|
dfs->size_cbfunc(i64, dfs->cbdata);
|
|
|
|
}
|
|
|
|
/* release the request */
|
|
|
|
OBJ_RELEASE(dfs);
|
|
|
|
break;
|
|
|
|
|
2012-10-26 15:49:04 +00:00
|
|
|
case ORTE_DFS_SEEK_CMD:
|
|
|
|
/* unpack the request id for this read */
|
|
|
|
cnt = 1;
|
|
|
|
if (OPAL_SUCCESS != (rc = opal_dss.unpack(buffer, &rid, &cnt, OPAL_UINT64))) {
|
|
|
|
ORTE_ERROR_LOG(rc);
|
|
|
|
return;
|
|
|
|
}
|
|
|
|
/* search our list of requests to find the matching one */
|
|
|
|
dfs = NULL;
|
|
|
|
for (item = opal_list_get_first(&requests);
|
|
|
|
item != opal_list_get_end(&requests);
|
|
|
|
item = opal_list_get_next(item)) {
|
|
|
|
dptr = (orte_dfs_request_t*)item;
|
|
|
|
if (dptr->id == rid) {
|
|
|
|
/* request was fulfilled, so remove it */
|
|
|
|
opal_list_remove_item(&requests, item);
|
|
|
|
dfs = dptr;
|
|
|
|
break;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
if (NULL == dfs) {
|
2013-03-27 21:14:43 +00:00
|
|
|
opal_output_verbose(1, orte_dfs_base_framework.framework_output,
|
2012-10-26 15:49:04 +00:00
|
|
|
"%s recvd seek - no corresponding request found for local fd %d",
|
|
|
|
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), local_fd);
|
|
|
|
ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND);
|
|
|
|
return;
|
|
|
|
}
|
|
|
|
/* get the returned offset/status */
|
|
|
|
cnt = 1;
|
|
|
|
if (OPAL_SUCCESS != (rc = opal_dss.unpack(buffer, &i64, &cnt, OPAL_INT64))) {
|
|
|
|
ORTE_ERROR_LOG(rc);
|
|
|
|
OBJ_RELEASE(dfs);
|
|
|
|
return;
|
|
|
|
}
|
|
|
|
/* pass it back to the original caller */
|
|
|
|
if (NULL != dfs->seek_cbfunc) {
|
|
|
|
dfs->seek_cbfunc(i64, dfs->cbdata);
|
|
|
|
}
|
|
|
|
/* release the request */
|
|
|
|
OBJ_RELEASE(dfs);
|
|
|
|
break;
|
|
|
|
|
2012-10-25 17:15:17 +00:00
|
|
|
case ORTE_DFS_READ_CMD:
|
|
|
|
/* unpack the request id for this read */
|
|
|
|
cnt = 1;
|
|
|
|
if (OPAL_SUCCESS != (rc = opal_dss.unpack(buffer, &rid, &cnt, OPAL_UINT64))) {
|
|
|
|
ORTE_ERROR_LOG(rc);
|
|
|
|
return;
|
|
|
|
}
|
|
|
|
/* search our list of requests to find the matching one */
|
|
|
|
dfs = NULL;
|
|
|
|
for (item = opal_list_get_first(&requests);
|
|
|
|
item != opal_list_get_end(&requests);
|
|
|
|
item = opal_list_get_next(item)) {
|
|
|
|
dptr = (orte_dfs_request_t*)item;
|
|
|
|
if (dptr->id == rid) {
|
|
|
|
/* request was fulfilled, so remove it */
|
|
|
|
opal_list_remove_item(&requests, item);
|
|
|
|
dfs = dptr;
|
|
|
|
break;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
if (NULL == dfs) {
|
2013-03-27 21:14:43 +00:00
|
|
|
opal_output_verbose(1, orte_dfs_base_framework.framework_output,
|
2012-10-25 17:15:17 +00:00
|
|
|
"%s recvd read - no corresponding request found for local fd %d",
|
|
|
|
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), local_fd);
|
|
|
|
ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND);
|
|
|
|
return;
|
|
|
|
}
|
|
|
|
/* get the bytes read */
|
|
|
|
cnt = 1;
|
|
|
|
if (OPAL_SUCCESS != (rc = opal_dss.unpack(buffer, &i64, &cnt, OPAL_INT64))) {
|
|
|
|
ORTE_ERROR_LOG(rc);
|
|
|
|
OBJ_RELEASE(dfs);
|
|
|
|
return;
|
|
|
|
}
|
|
|
|
if (0 < i64) {
|
|
|
|
cnt = i64;
|
|
|
|
if (OPAL_SUCCESS != (rc = opal_dss.unpack(buffer, dfs->read_buffer, &cnt, OPAL_UINT8))) {
|
|
|
|
ORTE_ERROR_LOG(rc);
|
|
|
|
OBJ_RELEASE(dfs);
|
|
|
|
return;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
/* pass them back to the original caller */
|
|
|
|
if (NULL != dfs->read_cbfunc) {
|
|
|
|
dfs->read_cbfunc(i64, dfs->read_buffer, dfs->cbdata);
|
|
|
|
}
|
|
|
|
/* release the request */
|
|
|
|
OBJ_RELEASE(dfs);
|
|
|
|
break;
|
|
|
|
|
2012-10-29 23:05:45 +00:00
|
|
|
case ORTE_DFS_POST_CMD:
|
|
|
|
/* unpack the request id for this read */
|
|
|
|
cnt = 1;
|
|
|
|
if (OPAL_SUCCESS != (rc = opal_dss.unpack(buffer, &rid, &cnt, OPAL_UINT64))) {
|
|
|
|
ORTE_ERROR_LOG(rc);
|
|
|
|
return;
|
|
|
|
}
|
|
|
|
/* search our list of requests to find the matching one */
|
|
|
|
dfs = NULL;
|
|
|
|
for (item = opal_list_get_first(&requests);
|
|
|
|
item != opal_list_get_end(&requests);
|
|
|
|
item = opal_list_get_next(item)) {
|
|
|
|
dptr = (orte_dfs_request_t*)item;
|
|
|
|
if (dptr->id == rid) {
|
|
|
|
/* request was fulfilled, so remove it */
|
|
|
|
opal_list_remove_item(&requests, item);
|
|
|
|
dfs = dptr;
|
|
|
|
break;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
if (NULL == dfs) {
|
2013-03-27 21:14:43 +00:00
|
|
|
opal_output_verbose(1, orte_dfs_base_framework.framework_output,
|
2012-10-29 23:05:45 +00:00
|
|
|
"%s recvd post - no corresponding request found",
|
|
|
|
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME));
|
|
|
|
ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND);
|
|
|
|
return;
|
|
|
|
}
|
|
|
|
if (NULL != dfs->post_cbfunc) {
|
|
|
|
dfs->post_cbfunc(dfs->cbdata);
|
|
|
|
}
|
|
|
|
OBJ_RELEASE(dfs);
|
|
|
|
break;
|
|
|
|
|
|
|
|
case ORTE_DFS_GETFM_CMD:
|
|
|
|
/* unpack the request id for this read */
|
|
|
|
cnt = 1;
|
|
|
|
if (OPAL_SUCCESS != (rc = opal_dss.unpack(buffer, &rid, &cnt, OPAL_UINT64))) {
|
|
|
|
ORTE_ERROR_LOG(rc);
|
|
|
|
return;
|
|
|
|
}
|
|
|
|
/* search our list of requests to find the matching one */
|
|
|
|
dfs = NULL;
|
|
|
|
for (item = opal_list_get_first(&requests);
|
|
|
|
item != opal_list_get_end(&requests);
|
|
|
|
item = opal_list_get_next(item)) {
|
|
|
|
dptr = (orte_dfs_request_t*)item;
|
|
|
|
if (dptr->id == rid) {
|
|
|
|
/* request was fulfilled, so remove it */
|
|
|
|
opal_list_remove_item(&requests, item);
|
|
|
|
dfs = dptr;
|
|
|
|
break;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
if (NULL == dfs) {
|
2013-03-27 21:14:43 +00:00
|
|
|
opal_output_verbose(1, orte_dfs_base_framework.framework_output,
|
2012-10-29 23:05:45 +00:00
|
|
|
"%s recvd getfm - no corresponding request found",
|
|
|
|
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME));
|
|
|
|
ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND);
|
|
|
|
return;
|
|
|
|
}
|
|
|
|
/* return it to caller */
|
|
|
|
if (NULL != dfs->fm_cbfunc) {
|
2012-11-10 14:09:12 +00:00
|
|
|
dfs->fm_cbfunc(buffer, dfs->cbdata);
|
2012-10-29 23:05:45 +00:00
|
|
|
}
|
|
|
|
OBJ_RELEASE(dfs);
|
|
|
|
break;
|
|
|
|
|
2012-10-25 17:15:17 +00:00
|
|
|
default:
|
|
|
|
opal_output(0, "APP:DFS:RECV WTF");
|
|
|
|
break;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
static void open_local_file(orte_dfs_request_t *dfs)
|
|
|
|
{
|
|
|
|
char *filename;
|
|
|
|
orte_dfs_tracker_t *trk;
|
|
|
|
|
|
|
|
/* extract the filename from the uri */
|
|
|
|
if (NULL == (filename = opal_filename_from_uri(dfs->uri, NULL))) {
|
|
|
|
/* something wrong - error was reported, so just get out */
|
|
|
|
if (NULL != dfs->open_cbfunc) {
|
|
|
|
dfs->open_cbfunc(-1, dfs->cbdata);
|
|
|
|
}
|
|
|
|
OBJ_RELEASE(dfs);
|
|
|
|
return;
|
|
|
|
}
|
2013-03-27 21:14:43 +00:00
|
|
|
opal_output_verbose(1, orte_dfs_base_framework.framework_output,
|
2012-10-25 17:15:17 +00:00
|
|
|
"%s opening local file %s",
|
|
|
|
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
|
|
|
filename);
|
|
|
|
/* attempt to open the file */
|
|
|
|
if (0 > (dfs->remote_fd = open(filename, O_RDONLY))) {
|
|
|
|
ORTE_ERROR_LOG(ORTE_ERR_FILE_OPEN_FAILURE);
|
|
|
|
if (NULL != dfs->open_cbfunc) {
|
|
|
|
dfs->open_cbfunc(dfs->remote_fd, dfs->cbdata);
|
|
|
|
}
|
|
|
|
return;
|
|
|
|
}
|
|
|
|
/* otherwise, create a tracker for this file */
|
|
|
|
trk = OBJ_NEW(orte_dfs_tracker_t);
|
|
|
|
trk->requestor.jobid = ORTE_PROC_MY_NAME->jobid;
|
|
|
|
trk->requestor.vpid = ORTE_PROC_MY_NAME->vpid;
|
2012-11-12 02:54:53 +00:00
|
|
|
trk->uri = strdup(dfs->uri);
|
|
|
|
/* break the uri down into scheme and filename */
|
|
|
|
trk->scheme = opal_uri_get_scheme(dfs->uri);
|
2012-10-25 22:23:08 +00:00
|
|
|
trk->filename = strdup(filename);
|
2012-10-25 17:15:17 +00:00
|
|
|
/* define the local fd */
|
|
|
|
trk->local_fd = local_fd++;
|
|
|
|
/* record the remote file descriptor */
|
|
|
|
trk->remote_fd = dfs->remote_fd;
|
|
|
|
/* add it to our list of active files */
|
|
|
|
opal_list_append(&active_files, &trk->super);
|
|
|
|
/* the file is locally hosted */
|
|
|
|
trk->host_daemon.jobid = ORTE_PROC_MY_DAEMON->jobid;
|
|
|
|
trk->host_daemon.vpid = ORTE_PROC_MY_DAEMON->vpid;
|
2013-03-27 21:14:43 +00:00
|
|
|
opal_output_verbose(1, orte_dfs_base_framework.framework_output,
|
2012-10-25 17:15:17 +00:00
|
|
|
"%s local file %s mapped localfd %d to remotefd %d",
|
|
|
|
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
|
|
|
filename, trk->local_fd, trk->remote_fd);
|
|
|
|
/* let the caller know */
|
|
|
|
if (NULL != dfs->open_cbfunc) {
|
|
|
|
dfs->open_cbfunc(trk->local_fd, dfs->cbdata);
|
|
|
|
}
|
|
|
|
/* request will be released by the calling routing */
|
|
|
|
}
|
|
|
|
|
|
|
|
static void process_opens(int fd, short args, void *cbdata)
|
|
|
|
{
|
|
|
|
orte_dfs_request_t *dfs = (orte_dfs_request_t*)cbdata;
|
|
|
|
int rc;
|
|
|
|
opal_buffer_t *buffer;
|
2012-11-16 04:04:29 +00:00
|
|
|
char *scheme, *host, *filename;
|
2012-10-25 17:15:17 +00:00
|
|
|
orte_process_name_t daemon;
|
2012-11-16 04:04:29 +00:00
|
|
|
orte_vpid_t *v;
|
2012-10-25 17:15:17 +00:00
|
|
|
|
|
|
|
/* get the scheme to determine if we can process locally or not */
|
|
|
|
if (NULL == (scheme = opal_uri_get_scheme(dfs->uri))) {
|
2012-11-10 14:09:12 +00:00
|
|
|
ORTE_ERROR_LOG(ORTE_ERR_BAD_PARAM);
|
|
|
|
goto complete;
|
2012-10-25 17:15:17 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
if (0 == strcmp(scheme, "nfs")) {
|
|
|
|
open_local_file(dfs);
|
2012-11-10 14:09:12 +00:00
|
|
|
/* the callback was done in the above function */
|
|
|
|
OBJ_RELEASE(dfs);
|
|
|
|
return;
|
2012-10-25 17:15:17 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
if (0 != strcmp(scheme, "file")) {
|
|
|
|
/* not yet supported */
|
|
|
|
orte_show_help("orte_dfs_help.txt", "unsupported-filesystem",
|
|
|
|
true, dfs->uri);
|
|
|
|
goto complete;
|
|
|
|
}
|
|
|
|
|
|
|
|
/* dissect the uri to extract host and filename/path */
|
|
|
|
if (NULL == (filename = opal_filename_from_uri(dfs->uri, &host))) {
|
|
|
|
goto complete;
|
|
|
|
}
|
2012-11-10 14:09:12 +00:00
|
|
|
if (NULL == host) {
|
|
|
|
host = strdup(orte_process_info.nodename);
|
|
|
|
}
|
|
|
|
|
2012-10-25 17:15:17 +00:00
|
|
|
/* if the host is our own, then treat it as a local file */
|
2012-11-16 04:04:29 +00:00
|
|
|
if (0 == strcmp(host, orte_process_info.nodename) ||
|
2012-10-25 17:15:17 +00:00
|
|
|
0 == strcmp(host, "localhost") ||
|
|
|
|
opal_ifislocal(host)) {
|
2013-03-27 21:14:43 +00:00
|
|
|
opal_output_verbose(1, orte_dfs_base_framework.framework_output,
|
2012-10-25 17:15:17 +00:00
|
|
|
"%s file %s on local host",
|
|
|
|
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
|
|
|
filename);
|
|
|
|
open_local_file(dfs);
|
2012-11-10 14:09:12 +00:00
|
|
|
/* the callback was done in the above function */
|
|
|
|
OBJ_RELEASE(dfs);
|
|
|
|
return;
|
2012-10-25 17:15:17 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
/* ident the daemon on that host */
|
|
|
|
daemon.jobid = ORTE_PROC_MY_DAEMON->jobid;
|
2012-11-16 04:04:29 +00:00
|
|
|
/* fetch the daemon for this hostname */
|
2013-03-27 21:14:43 +00:00
|
|
|
opal_output_verbose(1, orte_dfs_base_framework.framework_output,
|
2012-11-16 04:04:29 +00:00
|
|
|
"%s looking for daemon on host %s",
|
|
|
|
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), host);
|
|
|
|
v = &daemon.vpid;
|
2013-04-08 23:34:16 +00:00
|
|
|
if (ORTE_SUCCESS != (rc = opal_db.fetch((opal_identifier_t*)ORTE_NAME_WILDCARD, host, (void**)&v, ORTE_VPID))) {
|
2012-11-16 04:04:29 +00:00
|
|
|
ORTE_ERROR_LOG(rc);
|
2012-10-25 17:15:17 +00:00
|
|
|
goto complete;
|
|
|
|
}
|
2012-11-16 04:04:29 +00:00
|
|
|
|
2013-03-27 21:14:43 +00:00
|
|
|
opal_output_verbose(1, orte_dfs_base_framework.framework_output,
|
2012-10-25 17:15:17 +00:00
|
|
|
"%s file %s on host %s daemon %s",
|
|
|
|
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
|
|
|
filename, host, ORTE_NAME_PRINT(&daemon));
|
2012-11-12 02:54:53 +00:00
|
|
|
|
2012-10-25 17:15:17 +00:00
|
|
|
/* double-check: if it is our local daemon, then we
|
|
|
|
* treat this as local
|
|
|
|
*/
|
|
|
|
if (daemon.vpid == ORTE_PROC_MY_DAEMON->vpid) {
|
2013-03-27 21:14:43 +00:00
|
|
|
opal_output_verbose(1, orte_dfs_base_framework.framework_output,
|
2012-10-25 17:15:17 +00:00
|
|
|
"%s local file %s on same daemon",
|
|
|
|
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
|
|
|
filename);
|
|
|
|
open_local_file(dfs);
|
2012-11-10 14:09:12 +00:00
|
|
|
/* the callback was done in the above function */
|
|
|
|
OBJ_RELEASE(dfs);
|
|
|
|
return;
|
2012-10-25 17:15:17 +00:00
|
|
|
}
|
2012-11-12 02:54:53 +00:00
|
|
|
|
2012-10-25 17:15:17 +00:00
|
|
|
/* add this request to our local list so we can
|
|
|
|
* match it with the returned response when it comes
|
|
|
|
*/
|
|
|
|
dfs->id = req_id++;
|
|
|
|
opal_list_append(&requests, &dfs->super);
|
|
|
|
|
|
|
|
/* setup a message for the daemon telling
|
|
|
|
* them what file we want to access
|
|
|
|
*/
|
|
|
|
buffer = OBJ_NEW(opal_buffer_t);
|
|
|
|
if (OPAL_SUCCESS != (rc = opal_dss.pack(buffer, &dfs->cmd, 1, ORTE_DFS_CMD_T))) {
|
|
|
|
ORTE_ERROR_LOG(rc);
|
|
|
|
opal_list_remove_item(&requests, &dfs->super);
|
|
|
|
goto complete;
|
|
|
|
}
|
|
|
|
/* pass the request id */
|
|
|
|
if (OPAL_SUCCESS != (rc = opal_dss.pack(buffer, &dfs->id, 1, OPAL_UINT64))) {
|
|
|
|
ORTE_ERROR_LOG(rc);
|
|
|
|
opal_list_remove_item(&requests, &dfs->super);
|
|
|
|
goto complete;
|
|
|
|
}
|
|
|
|
if (OPAL_SUCCESS != (rc = opal_dss.pack(buffer, &filename, 1, OPAL_STRING))) {
|
|
|
|
ORTE_ERROR_LOG(rc);
|
|
|
|
opal_list_remove_item(&requests, &dfs->super);
|
|
|
|
goto complete;
|
|
|
|
}
|
|
|
|
|
2013-03-27 21:14:43 +00:00
|
|
|
opal_output_verbose(1, orte_dfs_base_framework.framework_output,
|
2012-10-25 17:15:17 +00:00
|
|
|
"%s sending open file request to %s file %s",
|
|
|
|
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
|
|
|
ORTE_NAME_PRINT(&daemon),
|
|
|
|
filename);
|
|
|
|
/* send it */
|
|
|
|
if (0 > (rc = orte_rml.send_buffer_nb(&daemon, buffer,
|
As per the RFC, bring in the ORTE async progress code and the rewrite of OOB:
*** THIS RFC INCLUDES A MINOR CHANGE TO THE MPI-RTE INTERFACE ***
Note: during the course of this work, it was necessary to completely separate the MPI and RTE progress engines. There were multiple places in the MPI layer where ORTE_WAIT_FOR_COMPLETION was being used. A new OMPI_WAIT_FOR_COMPLETION macro was created (defined in ompi/mca/rte/rte.h) that simply cycles across opal_progress until the provided flag becomes false. Places where the MPI layer blocked waiting for RTE to complete an event have been modified to use this macro.
***************************************************************************************
I am reissuing this RFC because of the time that has passed since its original release. Since its initial release and review, I have debugged it further to ensure it fully supports tests like loop_spawn. It therefore seems ready for merge back to the trunk. Given its prior review, I have set the timeout for one week.
The code is in https://bitbucket.org/rhc/ompi-oob2
WHAT: Rewrite of ORTE OOB
WHY: Support asynchronous progress and a host of other features
WHEN: Wed, August 21
SYNOPSIS:
The current OOB has served us well, but a number of limitations have been identified over the years. Specifically:
* it is only progressed when called via opal_progress, which can lead to hangs or recursive calls into libevent (which is not supported by that code)
* we've had issues when multiple NICs are available as the code doesn't "shift" messages between transports - thus, all nodes had to be available via the same TCP interface.
* the OOB "unloads" incoming opal_buffer_t objects during the transmission, thus preventing use of OBJ_RETAIN in the code when repeatedly sending the same message to multiple recipients
* there is no failover mechanism across NICs - if the selected NIC (or its attached switch) fails, we are forced to abort
* only one transport (i.e., component) can be "active"
The revised OOB resolves these problems:
* async progress is used for all application processes, with the progress thread blocking in the event library
* each available TCP NIC is supported by its own TCP module. The ability to asynchronously progress each module independently is provided, but not enabled by default (a runtime MCA parameter turns it "on")
* multi-address TCP NICs (e.g., a NIC with both an IPv4 and IPv6 address, or with virtual interfaces) are supported - reachability is determined by comparing the contact info for a peer against all addresses within the range covered by the address/mask pairs for the NIC.
* a message that arrives on one TCP NIC is automatically shifted to whatever NIC that is connected to the next "hop" if that peer cannot be reached by the incoming NIC. If no TCP module will reach the peer, then the OOB attempts to send the message via all other available components - if none can reach the peer, then an "error" is reported back to the RML, which then calls the errmgr for instructions.
* opal_buffer_t now conforms to standard object rules re OBJ_RETAIN as we no longer "unload" the incoming object
* NIC failure is reported to the TCP component, which then tries to resend the message across any other available TCP NIC. If that doesn't work, then the message is given back to the OOB base to try using other components. If all that fails, then the error is reported to the RML, which reports to the errmgr for instructions
* obviously from the above, multiple OOB components (e.g., TCP and UD) can be active in parallel
* the matching code has been moved to the RML (and out of the OOB/TCP component) so it is independent of transport
* routing is done by the individual OOB modules (as opposed to the RML). Thus, both routed and non-routed transports can simultaneously be active
* all blocking send/recv APIs have been removed. Everything operates asynchronously.
KNOWN LIMITATIONS:
* although provision is made for component failover as described above, the code for doing so has not been fully implemented yet. At the moment, if all connections for a given peer fail, the errmgr is notified of a "lost connection", which by default results in termination of the job if it was a lifeline
* the IPv6 code is present and compiles, but is not complete. Since the current IPv6 support in the OOB doesn't work anyway, I don't consider this a blocker
* routing is performed at the individual module level, yet the active routed component is selected on a global basis. We probably should update that to reflect that different transports may need/choose to route in different ways
* obviously, not every error path has been tested nor necessarily covered
* determining abnormal termination is more challenging than in the old code as we now potentially have multiple ways of connecting to a process. Ideally, we would declare "connection failed" when *all* transports can no longer reach the process, but that requires some additional (possibly complex) code. For now, the code replicates the old behavior only somewhat modified - i.e., if a module sees its connection fail, it checks to see if it is a lifeline. If so, it notifies the errmgr that the lifeline is lost - otherwise, it notifies the errmgr that a non-lifeline connection was lost.
* reachability is determined solely on the basis of a shared subnet address/mask - more sophisticated algorithms (e.g., the one used in the tcp btl) are required to handle routing via gateways
* the RML needs to assign sequence numbers to each message on a per-peer basis. The receiving RML will then deliver messages in order, thus preventing out-of-order messaging in the case where messages travel across different transports or a message needs to be redirected/resent due to failure of a NIC
This commit was SVN r29058.
2013-08-22 16:37:40 +00:00
|
|
|
ORTE_RML_TAG_DFS_CMD,
|
2012-10-25 17:15:17 +00:00
|
|
|
orte_rml_send_callback, NULL))) {
|
|
|
|
ORTE_ERROR_LOG(rc);
|
|
|
|
OBJ_RELEASE(buffer);
|
|
|
|
opal_list_remove_item(&requests, &dfs->super);
|
|
|
|
goto complete;
|
|
|
|
}
|
|
|
|
/* don't release it */
|
|
|
|
return;
|
|
|
|
|
|
|
|
complete:
|
2012-11-10 14:09:12 +00:00
|
|
|
/* we get here if an error occurred - execute any
|
|
|
|
* pending callback so the proc doesn't hang
|
|
|
|
*/
|
|
|
|
if (NULL != dfs->open_cbfunc) {
|
|
|
|
dfs->open_cbfunc(-1, dfs->cbdata);
|
|
|
|
}
|
2012-10-25 17:15:17 +00:00
|
|
|
OBJ_RELEASE(dfs);
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
/* in order to handle the possible opening/reading of files by
|
|
|
|
* multiple threads, we have to ensure that all operations are
|
|
|
|
* carried out in events - so the "open" cmd simply posts an
|
|
|
|
* event containing the required info, and then returns
|
|
|
|
*/
|
|
|
|
static void dfs_open(char *uri,
|
|
|
|
orte_dfs_open_callback_fn_t cbfunc,
|
|
|
|
void *cbdata)
|
|
|
|
{
|
|
|
|
orte_dfs_request_t *dfs;
|
|
|
|
|
2013-03-27 21:14:43 +00:00
|
|
|
opal_output_verbose(1, orte_dfs_base_framework.framework_output,
|
2012-10-25 17:15:17 +00:00
|
|
|
"%s opening file %s",
|
|
|
|
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), uri);
|
|
|
|
|
|
|
|
/* setup the request */
|
|
|
|
dfs = OBJ_NEW(orte_dfs_request_t);
|
|
|
|
dfs->cmd = ORTE_DFS_OPEN_CMD;
|
|
|
|
dfs->uri = strdup(uri);
|
|
|
|
dfs->open_cbfunc = cbfunc;
|
|
|
|
dfs->cbdata = cbdata;
|
|
|
|
|
|
|
|
/* post it for processing */
|
|
|
|
ORTE_DFS_POST_REQUEST(dfs, process_opens);
|
|
|
|
}
|
|
|
|
|
|
|
|
static void process_close(int fd, short args, void *cbdata)
|
|
|
|
{
|
|
|
|
orte_dfs_request_t *close_dfs = (orte_dfs_request_t*)cbdata;
|
|
|
|
orte_dfs_tracker_t *tptr, *trk;
|
|
|
|
opal_list_item_t *item;
|
|
|
|
opal_buffer_t *buffer;
|
|
|
|
int rc;
|
|
|
|
|
2013-03-27 21:14:43 +00:00
|
|
|
opal_output_verbose(1, orte_dfs_base_framework.framework_output,
|
2012-10-25 17:15:17 +00:00
|
|
|
"%s closing fd %d",
|
|
|
|
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
|
|
|
close_dfs->local_fd);
|
|
|
|
|
|
|
|
/* look in our local records for this fd */
|
|
|
|
trk = NULL;
|
|
|
|
for (item = opal_list_get_first(&active_files);
|
|
|
|
item != opal_list_get_end(&active_files);
|
|
|
|
item = opal_list_get_next(item)) {
|
|
|
|
tptr = (orte_dfs_tracker_t*)item;
|
|
|
|
if (tptr->local_fd == close_dfs->local_fd) {
|
|
|
|
trk = tptr;
|
|
|
|
break;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
if (NULL == trk) {
|
|
|
|
ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND);
|
2012-10-26 15:49:04 +00:00
|
|
|
if (NULL != close_dfs->close_cbfunc) {
|
|
|
|
close_dfs->close_cbfunc(close_dfs->local_fd, close_dfs->cbdata);
|
|
|
|
}
|
2012-10-25 17:15:17 +00:00
|
|
|
OBJ_RELEASE(close_dfs);
|
|
|
|
return;
|
|
|
|
}
|
|
|
|
|
|
|
|
/* if the file is local, close it */
|
|
|
|
if (trk->host_daemon.vpid == ORTE_PROC_MY_DAEMON->vpid) {
|
|
|
|
close(trk->remote_fd);
|
|
|
|
goto complete;
|
|
|
|
}
|
|
|
|
|
|
|
|
/* setup a message for the daemon telling
|
|
|
|
* them what file to close
|
|
|
|
*/
|
|
|
|
buffer = OBJ_NEW(opal_buffer_t);
|
|
|
|
if (OPAL_SUCCESS != (rc = opal_dss.pack(buffer, &close_dfs->cmd, 1, ORTE_DFS_CMD_T))) {
|
|
|
|
ORTE_ERROR_LOG(rc);
|
|
|
|
goto complete;
|
|
|
|
}
|
|
|
|
if (OPAL_SUCCESS != (rc = opal_dss.pack(buffer, &trk->remote_fd, 1, OPAL_INT))) {
|
|
|
|
ORTE_ERROR_LOG(rc);
|
|
|
|
goto complete;
|
|
|
|
}
|
|
|
|
|
2013-03-27 21:14:43 +00:00
|
|
|
opal_output_verbose(1, orte_dfs_base_framework.framework_output,
|
2012-10-25 17:15:17 +00:00
|
|
|
"%s sending close file request to %s for fd %d",
|
|
|
|
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
|
|
|
ORTE_NAME_PRINT(&trk->host_daemon),
|
|
|
|
trk->local_fd);
|
|
|
|
/* send it */
|
|
|
|
if (0 > (rc = orte_rml.send_buffer_nb(&trk->host_daemon, buffer,
|
As per the RFC, bring in the ORTE async progress code and the rewrite of OOB:
*** THIS RFC INCLUDES A MINOR CHANGE TO THE MPI-RTE INTERFACE ***
Note: during the course of this work, it was necessary to completely separate the MPI and RTE progress engines. There were multiple places in the MPI layer where ORTE_WAIT_FOR_COMPLETION was being used. A new OMPI_WAIT_FOR_COMPLETION macro was created (defined in ompi/mca/rte/rte.h) that simply cycles across opal_progress until the provided flag becomes false. Places where the MPI layer blocked waiting for RTE to complete an event have been modified to use this macro.
***************************************************************************************
I am reissuing this RFC because of the time that has passed since its original release. Since its initial release and review, I have debugged it further to ensure it fully supports tests like loop_spawn. It therefore seems ready for merge back to the trunk. Given its prior review, I have set the timeout for one week.
The code is in https://bitbucket.org/rhc/ompi-oob2
WHAT: Rewrite of ORTE OOB
WHY: Support asynchronous progress and a host of other features
WHEN: Wed, August 21
SYNOPSIS:
The current OOB has served us well, but a number of limitations have been identified over the years. Specifically:
* it is only progressed when called via opal_progress, which can lead to hangs or recursive calls into libevent (which is not supported by that code)
* we've had issues when multiple NICs are available as the code doesn't "shift" messages between transports - thus, all nodes had to be available via the same TCP interface.
* the OOB "unloads" incoming opal_buffer_t objects during the transmission, thus preventing use of OBJ_RETAIN in the code when repeatedly sending the same message to multiple recipients
* there is no failover mechanism across NICs - if the selected NIC (or its attached switch) fails, we are forced to abort
* only one transport (i.e., component) can be "active"
The revised OOB resolves these problems:
* async progress is used for all application processes, with the progress thread blocking in the event library
* each available TCP NIC is supported by its own TCP module. The ability to asynchronously progress each module independently is provided, but not enabled by default (a runtime MCA parameter turns it "on")
* multi-address TCP NICs (e.g., a NIC with both an IPv4 and IPv6 address, or with virtual interfaces) are supported - reachability is determined by comparing the contact info for a peer against all addresses within the range covered by the address/mask pairs for the NIC.
* a message that arrives on one TCP NIC is automatically shifted to whatever NIC that is connected to the next "hop" if that peer cannot be reached by the incoming NIC. If no TCP module will reach the peer, then the OOB attempts to send the message via all other available components - if none can reach the peer, then an "error" is reported back to the RML, which then calls the errmgr for instructions.
* opal_buffer_t now conforms to standard object rules re OBJ_RETAIN as we no longer "unload" the incoming object
* NIC failure is reported to the TCP component, which then tries to resend the message across any other available TCP NIC. If that doesn't work, then the message is given back to the OOB base to try using other components. If all that fails, then the error is reported to the RML, which reports to the errmgr for instructions
* obviously from the above, multiple OOB components (e.g., TCP and UD) can be active in parallel
* the matching code has been moved to the RML (and out of the OOB/TCP component) so it is independent of transport
* routing is done by the individual OOB modules (as opposed to the RML). Thus, both routed and non-routed transports can simultaneously be active
* all blocking send/recv APIs have been removed. Everything operates asynchronously.
KNOWN LIMITATIONS:
* although provision is made for component failover as described above, the code for doing so has not been fully implemented yet. At the moment, if all connections for a given peer fail, the errmgr is notified of a "lost connection", which by default results in termination of the job if it was a lifeline
* the IPv6 code is present and compiles, but is not complete. Since the current IPv6 support in the OOB doesn't work anyway, I don't consider this a blocker
* routing is performed at the individual module level, yet the active routed component is selected on a global basis. We probably should update that to reflect that different transports may need/choose to route in different ways
* obviously, not every error path has been tested nor necessarily covered
* determining abnormal termination is more challenging than in the old code as we now potentially have multiple ways of connecting to a process. Ideally, we would declare "connection failed" when *all* transports can no longer reach the process, but that requires some additional (possibly complex) code. For now, the code replicates the old behavior only somewhat modified - i.e., if a module sees its connection fail, it checks to see if it is a lifeline. If so, it notifies the errmgr that the lifeline is lost - otherwise, it notifies the errmgr that a non-lifeline connection was lost.
* reachability is determined solely on the basis of a shared subnet address/mask - more sophisticated algorithms (e.g., the one used in the tcp btl) are required to handle routing via gateways
* the RML needs to assign sequence numbers to each message on a per-peer basis. The receiving RML will then deliver messages in order, thus preventing out-of-order messaging in the case where messages travel across different transports or a message needs to be redirected/resent due to failure of a NIC
This commit was SVN r29058.
2013-08-22 16:37:40 +00:00
|
|
|
ORTE_RML_TAG_DFS_CMD,
|
2012-10-25 17:15:17 +00:00
|
|
|
orte_rml_send_callback, NULL))) {
|
|
|
|
ORTE_ERROR_LOG(rc);
|
|
|
|
OBJ_RELEASE(buffer);
|
|
|
|
goto complete;
|
|
|
|
}
|
|
|
|
|
|
|
|
complete:
|
|
|
|
opal_list_remove_item(&active_files, &trk->super);
|
|
|
|
OBJ_RELEASE(trk);
|
2012-10-26 15:49:04 +00:00
|
|
|
if (NULL != close_dfs->close_cbfunc) {
|
|
|
|
close_dfs->close_cbfunc(close_dfs->local_fd, close_dfs->cbdata);
|
|
|
|
}
|
2012-10-25 17:15:17 +00:00
|
|
|
OBJ_RELEASE(close_dfs);
|
|
|
|
}
|
|
|
|
|
2012-10-26 15:49:04 +00:00
|
|
|
static void dfs_close(int fd,
|
|
|
|
orte_dfs_close_callback_fn_t cbfunc,
|
|
|
|
void *cbdata)
|
2012-10-25 17:15:17 +00:00
|
|
|
{
|
|
|
|
orte_dfs_request_t *dfs;
|
|
|
|
|
2013-03-27 21:14:43 +00:00
|
|
|
opal_output_verbose(1, orte_dfs_base_framework.framework_output,
|
2012-10-25 17:15:17 +00:00
|
|
|
"%s close called on fd %d",
|
|
|
|
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), fd);
|
|
|
|
|
|
|
|
dfs = OBJ_NEW(orte_dfs_request_t);
|
|
|
|
dfs->cmd = ORTE_DFS_CLOSE_CMD;
|
|
|
|
dfs->local_fd = fd;
|
2012-10-26 15:49:04 +00:00
|
|
|
dfs->close_cbfunc = cbfunc;
|
|
|
|
dfs->cbdata = cbdata;
|
2012-10-25 17:15:17 +00:00
|
|
|
|
|
|
|
/* post it for processing */
|
|
|
|
ORTE_DFS_POST_REQUEST(dfs, process_close);
|
|
|
|
}
|
|
|
|
|
2012-10-25 22:23:08 +00:00
|
|
|
static void process_sizes(int fd, short args, void *cbdata)
|
|
|
|
{
|
|
|
|
orte_dfs_request_t *size_dfs = (orte_dfs_request_t*)cbdata;
|
|
|
|
orte_dfs_tracker_t *tptr, *trk;
|
|
|
|
opal_list_item_t *item;
|
|
|
|
opal_buffer_t *buffer;
|
|
|
|
int rc;
|
|
|
|
struct stat buf;
|
|
|
|
|
2013-03-27 21:14:43 +00:00
|
|
|
opal_output_verbose(1, orte_dfs_base_framework.framework_output,
|
2012-10-25 22:23:08 +00:00
|
|
|
"%s processing get_size on fd %d",
|
|
|
|
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
|
|
|
size_dfs->local_fd);
|
|
|
|
|
|
|
|
/* look in our local records for this fd */
|
|
|
|
trk = NULL;
|
|
|
|
for (item = opal_list_get_first(&active_files);
|
|
|
|
item != opal_list_get_end(&active_files);
|
|
|
|
item = opal_list_get_next(item)) {
|
|
|
|
tptr = (orte_dfs_tracker_t*)item;
|
|
|
|
if (tptr->local_fd == size_dfs->local_fd) {
|
|
|
|
trk = tptr;
|
|
|
|
break;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
if (NULL == trk) {
|
|
|
|
ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND);
|
|
|
|
OBJ_RELEASE(size_dfs);
|
|
|
|
return;
|
|
|
|
}
|
|
|
|
|
|
|
|
/* if the file is local, execute the seek on it - we
|
|
|
|
* stuck the "whence" value in the remote_fd
|
|
|
|
*/
|
|
|
|
if (trk->host_daemon.vpid == ORTE_PROC_MY_DAEMON->vpid) {
|
|
|
|
/* stat the file and get its size */
|
|
|
|
if (0 > stat(trk->filename, &buf)) {
|
|
|
|
/* cannot stat file */
|
2013-03-27 21:14:43 +00:00
|
|
|
opal_output_verbose(1, orte_dfs_base_framework.framework_output,
|
2012-10-25 22:23:08 +00:00
|
|
|
"%s could not stat %s",
|
|
|
|
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
|
|
|
trk->filename);
|
|
|
|
if (NULL != size_dfs->size_cbfunc) {
|
|
|
|
size_dfs->size_cbfunc(-1, size_dfs->cbdata);
|
|
|
|
}
|
|
|
|
} else {
|
|
|
|
if (NULL != size_dfs->size_cbfunc) {
|
|
|
|
size_dfs->size_cbfunc(buf.st_size, size_dfs->cbdata);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
goto complete;
|
|
|
|
}
|
|
|
|
/* add this request to our local list so we can
|
|
|
|
* match it with the returned response when it comes
|
|
|
|
*/
|
|
|
|
size_dfs->id = req_id++;
|
|
|
|
opal_list_append(&requests, &size_dfs->super);
|
|
|
|
|
|
|
|
/* setup a message for the daemon telling
|
|
|
|
* them what file we want to access
|
|
|
|
*/
|
|
|
|
buffer = OBJ_NEW(opal_buffer_t);
|
|
|
|
if (OPAL_SUCCESS != (rc = opal_dss.pack(buffer, &size_dfs->cmd, 1, ORTE_DFS_CMD_T))) {
|
|
|
|
ORTE_ERROR_LOG(rc);
|
|
|
|
opal_list_remove_item(&requests, &size_dfs->super);
|
|
|
|
goto complete;
|
|
|
|
}
|
|
|
|
/* pass the request id */
|
|
|
|
if (OPAL_SUCCESS != (rc = opal_dss.pack(buffer, &size_dfs->id, 1, OPAL_UINT64))) {
|
|
|
|
ORTE_ERROR_LOG(rc);
|
|
|
|
opal_list_remove_item(&requests, &size_dfs->super);
|
|
|
|
goto complete;
|
|
|
|
}
|
|
|
|
if (OPAL_SUCCESS != (rc = opal_dss.pack(buffer, &trk->remote_fd, 1, OPAL_INT))) {
|
|
|
|
ORTE_ERROR_LOG(rc);
|
|
|
|
opal_list_remove_item(&requests, &size_dfs->super);
|
|
|
|
goto complete;
|
|
|
|
}
|
|
|
|
|
2013-03-27 21:14:43 +00:00
|
|
|
opal_output_verbose(1, orte_dfs_base_framework.framework_output,
|
2012-10-25 22:23:08 +00:00
|
|
|
"%s sending get_size request to %s for fd %d",
|
|
|
|
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
|
|
|
ORTE_NAME_PRINT(&trk->host_daemon),
|
|
|
|
trk->local_fd);
|
|
|
|
/* send it */
|
|
|
|
if (0 > (rc = orte_rml.send_buffer_nb(&trk->host_daemon, buffer,
|
As per the RFC, bring in the ORTE async progress code and the rewrite of OOB:
*** THIS RFC INCLUDES A MINOR CHANGE TO THE MPI-RTE INTERFACE ***
Note: during the course of this work, it was necessary to completely separate the MPI and RTE progress engines. There were multiple places in the MPI layer where ORTE_WAIT_FOR_COMPLETION was being used. A new OMPI_WAIT_FOR_COMPLETION macro was created (defined in ompi/mca/rte/rte.h) that simply cycles across opal_progress until the provided flag becomes false. Places where the MPI layer blocked waiting for RTE to complete an event have been modified to use this macro.
***************************************************************************************
I am reissuing this RFC because of the time that has passed since its original release. Since its initial release and review, I have debugged it further to ensure it fully supports tests like loop_spawn. It therefore seems ready for merge back to the trunk. Given its prior review, I have set the timeout for one week.
The code is in https://bitbucket.org/rhc/ompi-oob2
WHAT: Rewrite of ORTE OOB
WHY: Support asynchronous progress and a host of other features
WHEN: Wed, August 21
SYNOPSIS:
The current OOB has served us well, but a number of limitations have been identified over the years. Specifically:
* it is only progressed when called via opal_progress, which can lead to hangs or recursive calls into libevent (which is not supported by that code)
* we've had issues when multiple NICs are available as the code doesn't "shift" messages between transports - thus, all nodes had to be available via the same TCP interface.
* the OOB "unloads" incoming opal_buffer_t objects during the transmission, thus preventing use of OBJ_RETAIN in the code when repeatedly sending the same message to multiple recipients
* there is no failover mechanism across NICs - if the selected NIC (or its attached switch) fails, we are forced to abort
* only one transport (i.e., component) can be "active"
The revised OOB resolves these problems:
* async progress is used for all application processes, with the progress thread blocking in the event library
* each available TCP NIC is supported by its own TCP module. The ability to asynchronously progress each module independently is provided, but not enabled by default (a runtime MCA parameter turns it "on")
* multi-address TCP NICs (e.g., a NIC with both an IPv4 and IPv6 address, or with virtual interfaces) are supported - reachability is determined by comparing the contact info for a peer against all addresses within the range covered by the address/mask pairs for the NIC.
* a message that arrives on one TCP NIC is automatically shifted to whatever NIC that is connected to the next "hop" if that peer cannot be reached by the incoming NIC. If no TCP module will reach the peer, then the OOB attempts to send the message via all other available components - if none can reach the peer, then an "error" is reported back to the RML, which then calls the errmgr for instructions.
* opal_buffer_t now conforms to standard object rules re OBJ_RETAIN as we no longer "unload" the incoming object
* NIC failure is reported to the TCP component, which then tries to resend the message across any other available TCP NIC. If that doesn't work, then the message is given back to the OOB base to try using other components. If all that fails, then the error is reported to the RML, which reports to the errmgr for instructions
* obviously from the above, multiple OOB components (e.g., TCP and UD) can be active in parallel
* the matching code has been moved to the RML (and out of the OOB/TCP component) so it is independent of transport
* routing is done by the individual OOB modules (as opposed to the RML). Thus, both routed and non-routed transports can simultaneously be active
* all blocking send/recv APIs have been removed. Everything operates asynchronously.
KNOWN LIMITATIONS:
* although provision is made for component failover as described above, the code for doing so has not been fully implemented yet. At the moment, if all connections for a given peer fail, the errmgr is notified of a "lost connection", which by default results in termination of the job if it was a lifeline
* the IPv6 code is present and compiles, but is not complete. Since the current IPv6 support in the OOB doesn't work anyway, I don't consider this a blocker
* routing is performed at the individual module level, yet the active routed component is selected on a global basis. We probably should update that to reflect that different transports may need/choose to route in different ways
* obviously, not every error path has been tested nor necessarily covered
* determining abnormal termination is more challenging than in the old code as we now potentially have multiple ways of connecting to a process. Ideally, we would declare "connection failed" when *all* transports can no longer reach the process, but that requires some additional (possibly complex) code. For now, the code replicates the old behavior only somewhat modified - i.e., if a module sees its connection fail, it checks to see if it is a lifeline. If so, it notifies the errmgr that the lifeline is lost - otherwise, it notifies the errmgr that a non-lifeline connection was lost.
* reachability is determined solely on the basis of a shared subnet address/mask - more sophisticated algorithms (e.g., the one used in the tcp btl) are required to handle routing via gateways
* the RML needs to assign sequence numbers to each message on a per-peer basis. The receiving RML will then deliver messages in order, thus preventing out-of-order messaging in the case where messages travel across different transports or a message needs to be redirected/resent due to failure of a NIC
This commit was SVN r29058.
2013-08-22 16:37:40 +00:00
|
|
|
ORTE_RML_TAG_DFS_CMD,
|
2012-10-25 22:23:08 +00:00
|
|
|
orte_rml_send_callback, NULL))) {
|
|
|
|
ORTE_ERROR_LOG(rc);
|
|
|
|
OBJ_RELEASE(buffer);
|
|
|
|
opal_list_remove_item(&requests, &size_dfs->super);
|
|
|
|
if (NULL != size_dfs->size_cbfunc) {
|
|
|
|
size_dfs->size_cbfunc(-1, size_dfs->cbdata);
|
|
|
|
}
|
|
|
|
goto complete;
|
|
|
|
}
|
|
|
|
/* leave the request there */
|
|
|
|
return;
|
|
|
|
|
|
|
|
complete:
|
|
|
|
OBJ_RELEASE(size_dfs);
|
|
|
|
}
|
|
|
|
|
|
|
|
static void dfs_get_file_size(int fd,
|
|
|
|
orte_dfs_size_callback_fn_t cbfunc,
|
|
|
|
void *cbdata)
|
|
|
|
{
|
|
|
|
orte_dfs_request_t *dfs;
|
|
|
|
|
2013-03-27 21:14:43 +00:00
|
|
|
opal_output_verbose(1, orte_dfs_base_framework.framework_output,
|
2012-10-25 22:23:08 +00:00
|
|
|
"%s get_size called on fd %d",
|
|
|
|
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), fd);
|
|
|
|
|
|
|
|
dfs = OBJ_NEW(orte_dfs_request_t);
|
|
|
|
dfs->cmd = ORTE_DFS_SIZE_CMD;
|
|
|
|
dfs->local_fd = fd;
|
|
|
|
dfs->size_cbfunc = cbfunc;
|
|
|
|
dfs->cbdata = cbdata;
|
|
|
|
|
|
|
|
/* post it for processing */
|
|
|
|
ORTE_DFS_POST_REQUEST(dfs, process_sizes);
|
|
|
|
}
|
|
|
|
|
|
|
|
|
2012-10-25 17:15:17 +00:00
|
|
|
static void process_seeks(int fd, short args, void *cbdata)
|
|
|
|
{
|
|
|
|
orte_dfs_request_t *seek_dfs = (orte_dfs_request_t*)cbdata;
|
|
|
|
orte_dfs_tracker_t *tptr, *trk;
|
|
|
|
opal_list_item_t *item;
|
|
|
|
opal_buffer_t *buffer;
|
|
|
|
int64_t i64;
|
|
|
|
int rc;
|
2012-10-26 15:49:04 +00:00
|
|
|
struct stat buf;
|
2012-10-25 17:15:17 +00:00
|
|
|
|
2013-03-27 21:14:43 +00:00
|
|
|
opal_output_verbose(1, orte_dfs_base_framework.framework_output,
|
2012-10-25 17:15:17 +00:00
|
|
|
"%s processing seek on fd %d",
|
|
|
|
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
|
|
|
seek_dfs->local_fd);
|
|
|
|
|
|
|
|
/* look in our local records for this fd */
|
|
|
|
trk = NULL;
|
|
|
|
for (item = opal_list_get_first(&active_files);
|
|
|
|
item != opal_list_get_end(&active_files);
|
|
|
|
item = opal_list_get_next(item)) {
|
|
|
|
tptr = (orte_dfs_tracker_t*)item;
|
|
|
|
if (tptr->local_fd == seek_dfs->local_fd) {
|
|
|
|
trk = tptr;
|
|
|
|
break;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
if (NULL == trk) {
|
|
|
|
ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND);
|
|
|
|
OBJ_RELEASE(seek_dfs);
|
|
|
|
return;
|
|
|
|
}
|
|
|
|
|
2012-10-25 22:23:08 +00:00
|
|
|
/* if the file is local, execute the seek on it - we
|
|
|
|
* stuck the "whence" value in the remote_fd
|
|
|
|
*/
|
2012-10-25 17:15:17 +00:00
|
|
|
if (trk->host_daemon.vpid == ORTE_PROC_MY_DAEMON->vpid) {
|
2013-03-27 21:14:43 +00:00
|
|
|
opal_output_verbose(1, orte_dfs_base_framework.framework_output,
|
2012-10-25 22:23:08 +00:00
|
|
|
"%s local seek on fd %d",
|
|
|
|
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
|
|
|
seek_dfs->local_fd);
|
2012-10-26 15:49:04 +00:00
|
|
|
/* stat the file and get its size */
|
|
|
|
if (0 > stat(trk->filename, &buf)) {
|
|
|
|
/* cannot stat file */
|
2013-03-27 21:14:43 +00:00
|
|
|
opal_output_verbose(1, orte_dfs_base_framework.framework_output,
|
2012-10-26 15:49:04 +00:00
|
|
|
"%s could not stat %s",
|
|
|
|
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
|
|
|
trk->filename);
|
|
|
|
if (NULL != seek_dfs->seek_cbfunc) {
|
|
|
|
seek_dfs->seek_cbfunc(-1, seek_dfs->cbdata);
|
|
|
|
}
|
|
|
|
} else if (buf.st_size < seek_dfs->read_length &&
|
|
|
|
SEEK_SET == seek_dfs->remote_fd) {
|
|
|
|
/* seek would take us past EOF */
|
|
|
|
if (NULL != seek_dfs->seek_cbfunc) {
|
|
|
|
seek_dfs->seek_cbfunc(-1, seek_dfs->cbdata);
|
|
|
|
}
|
|
|
|
} else if (buf.st_size < (off_t)(trk->location + seek_dfs->read_length) &&
|
|
|
|
SEEK_CUR == seek_dfs->remote_fd) {
|
|
|
|
/* seek would take us past EOF */
|
|
|
|
if (NULL != seek_dfs->seek_cbfunc) {
|
|
|
|
seek_dfs->seek_cbfunc(-1, seek_dfs->cbdata);
|
|
|
|
}
|
|
|
|
} else {
|
|
|
|
lseek(trk->remote_fd, seek_dfs->read_length, seek_dfs->remote_fd);
|
|
|
|
if (SEEK_SET == seek_dfs->remote_fd) {
|
|
|
|
trk->location = seek_dfs->read_length;
|
|
|
|
} else {
|
|
|
|
trk->location += seek_dfs->read_length;
|
|
|
|
}
|
|
|
|
if (NULL != seek_dfs->seek_cbfunc) {
|
|
|
|
seek_dfs->seek_cbfunc(seek_dfs->read_length, seek_dfs->cbdata);
|
|
|
|
}
|
|
|
|
}
|
2012-10-25 17:15:17 +00:00
|
|
|
goto complete;
|
|
|
|
}
|
2012-10-26 15:49:04 +00:00
|
|
|
/* add this request to our local list so we can
|
|
|
|
* match it with the returned response when it comes
|
|
|
|
*/
|
|
|
|
seek_dfs->id = req_id++;
|
|
|
|
opal_list_append(&requests, &seek_dfs->super);
|
2012-10-25 17:15:17 +00:00
|
|
|
|
|
|
|
/* setup a message for the daemon telling
|
|
|
|
* them what file to seek
|
|
|
|
*/
|
|
|
|
buffer = OBJ_NEW(opal_buffer_t);
|
|
|
|
if (OPAL_SUCCESS != (rc = opal_dss.pack(buffer, &seek_dfs->cmd, 1, ORTE_DFS_CMD_T))) {
|
|
|
|
ORTE_ERROR_LOG(rc);
|
|
|
|
goto complete;
|
|
|
|
}
|
2012-10-26 15:49:04 +00:00
|
|
|
/* pass the request id */
|
|
|
|
if (OPAL_SUCCESS != (rc = opal_dss.pack(buffer, &seek_dfs->id, 1, OPAL_UINT64))) {
|
|
|
|
ORTE_ERROR_LOG(rc);
|
|
|
|
opal_list_remove_item(&requests, &seek_dfs->super);
|
|
|
|
goto complete;
|
|
|
|
}
|
2012-10-25 17:15:17 +00:00
|
|
|
if (OPAL_SUCCESS != (rc = opal_dss.pack(buffer, &trk->remote_fd, 1, OPAL_INT))) {
|
|
|
|
ORTE_ERROR_LOG(rc);
|
|
|
|
goto complete;
|
|
|
|
}
|
|
|
|
i64 = (int64_t)seek_dfs->read_length;
|
|
|
|
if (OPAL_SUCCESS != (rc = opal_dss.pack(buffer, &i64, 1, OPAL_INT64))) {
|
|
|
|
ORTE_ERROR_LOG(rc);
|
|
|
|
goto complete;
|
|
|
|
}
|
2012-10-25 22:23:08 +00:00
|
|
|
if (OPAL_SUCCESS != (rc = opal_dss.pack(buffer, &seek_dfs->remote_fd, 1, OPAL_INT))) {
|
|
|
|
ORTE_ERROR_LOG(rc);
|
|
|
|
goto complete;
|
|
|
|
}
|
|
|
|
|
2013-03-27 21:14:43 +00:00
|
|
|
opal_output_verbose(1, orte_dfs_base_framework.framework_output,
|
2012-10-25 17:15:17 +00:00
|
|
|
"%s sending seek file request to %s for fd %d",
|
|
|
|
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
|
|
|
ORTE_NAME_PRINT(&trk->host_daemon),
|
|
|
|
trk->local_fd);
|
|
|
|
/* send it */
|
|
|
|
if (0 > (rc = orte_rml.send_buffer_nb(&trk->host_daemon, buffer,
|
As per the RFC, bring in the ORTE async progress code and the rewrite of OOB:
*** THIS RFC INCLUDES A MINOR CHANGE TO THE MPI-RTE INTERFACE ***
Note: during the course of this work, it was necessary to completely separate the MPI and RTE progress engines. There were multiple places in the MPI layer where ORTE_WAIT_FOR_COMPLETION was being used. A new OMPI_WAIT_FOR_COMPLETION macro was created (defined in ompi/mca/rte/rte.h) that simply cycles across opal_progress until the provided flag becomes false. Places where the MPI layer blocked waiting for RTE to complete an event have been modified to use this macro.
***************************************************************************************
I am reissuing this RFC because of the time that has passed since its original release. Since its initial release and review, I have debugged it further to ensure it fully supports tests like loop_spawn. It therefore seems ready for merge back to the trunk. Given its prior review, I have set the timeout for one week.
The code is in https://bitbucket.org/rhc/ompi-oob2
WHAT: Rewrite of ORTE OOB
WHY: Support asynchronous progress and a host of other features
WHEN: Wed, August 21
SYNOPSIS:
The current OOB has served us well, but a number of limitations have been identified over the years. Specifically:
* it is only progressed when called via opal_progress, which can lead to hangs or recursive calls into libevent (which is not supported by that code)
* we've had issues when multiple NICs are available as the code doesn't "shift" messages between transports - thus, all nodes had to be available via the same TCP interface.
* the OOB "unloads" incoming opal_buffer_t objects during the transmission, thus preventing use of OBJ_RETAIN in the code when repeatedly sending the same message to multiple recipients
* there is no failover mechanism across NICs - if the selected NIC (or its attached switch) fails, we are forced to abort
* only one transport (i.e., component) can be "active"
The revised OOB resolves these problems:
* async progress is used for all application processes, with the progress thread blocking in the event library
* each available TCP NIC is supported by its own TCP module. The ability to asynchronously progress each module independently is provided, but not enabled by default (a runtime MCA parameter turns it "on")
* multi-address TCP NICs (e.g., a NIC with both an IPv4 and IPv6 address, or with virtual interfaces) are supported - reachability is determined by comparing the contact info for a peer against all addresses within the range covered by the address/mask pairs for the NIC.
* a message that arrives on one TCP NIC is automatically shifted to whatever NIC that is connected to the next "hop" if that peer cannot be reached by the incoming NIC. If no TCP module will reach the peer, then the OOB attempts to send the message via all other available components - if none can reach the peer, then an "error" is reported back to the RML, which then calls the errmgr for instructions.
* opal_buffer_t now conforms to standard object rules re OBJ_RETAIN as we no longer "unload" the incoming object
* NIC failure is reported to the TCP component, which then tries to resend the message across any other available TCP NIC. If that doesn't work, then the message is given back to the OOB base to try using other components. If all that fails, then the error is reported to the RML, which reports to the errmgr for instructions
* obviously from the above, multiple OOB components (e.g., TCP and UD) can be active in parallel
* the matching code has been moved to the RML (and out of the OOB/TCP component) so it is independent of transport
* routing is done by the individual OOB modules (as opposed to the RML). Thus, both routed and non-routed transports can simultaneously be active
* all blocking send/recv APIs have been removed. Everything operates asynchronously.
KNOWN LIMITATIONS:
* although provision is made for component failover as described above, the code for doing so has not been fully implemented yet. At the moment, if all connections for a given peer fail, the errmgr is notified of a "lost connection", which by default results in termination of the job if it was a lifeline
* the IPv6 code is present and compiles, but is not complete. Since the current IPv6 support in the OOB doesn't work anyway, I don't consider this a blocker
* routing is performed at the individual module level, yet the active routed component is selected on a global basis. We probably should update that to reflect that different transports may need/choose to route in different ways
* obviously, not every error path has been tested nor necessarily covered
* determining abnormal termination is more challenging than in the old code as we now potentially have multiple ways of connecting to a process. Ideally, we would declare "connection failed" when *all* transports can no longer reach the process, but that requires some additional (possibly complex) code. For now, the code replicates the old behavior only somewhat modified - i.e., if a module sees its connection fail, it checks to see if it is a lifeline. If so, it notifies the errmgr that the lifeline is lost - otherwise, it notifies the errmgr that a non-lifeline connection was lost.
* reachability is determined solely on the basis of a shared subnet address/mask - more sophisticated algorithms (e.g., the one used in the tcp btl) are required to handle routing via gateways
* the RML needs to assign sequence numbers to each message on a per-peer basis. The receiving RML will then deliver messages in order, thus preventing out-of-order messaging in the case where messages travel across different transports or a message needs to be redirected/resent due to failure of a NIC
This commit was SVN r29058.
2013-08-22 16:37:40 +00:00
|
|
|
ORTE_RML_TAG_DFS_CMD,
|
2012-10-25 17:15:17 +00:00
|
|
|
orte_rml_send_callback, NULL))) {
|
|
|
|
ORTE_ERROR_LOG(rc);
|
|
|
|
OBJ_RELEASE(buffer);
|
|
|
|
goto complete;
|
|
|
|
}
|
2012-10-26 15:49:04 +00:00
|
|
|
/* leave the request */
|
|
|
|
return;
|
2012-10-25 17:15:17 +00:00
|
|
|
|
|
|
|
complete:
|
|
|
|
OBJ_RELEASE(seek_dfs);
|
|
|
|
}
|
|
|
|
|
|
|
|
|
2012-10-26 15:49:04 +00:00
|
|
|
static void dfs_seek(int fd, long offset, int whence,
|
|
|
|
orte_dfs_seek_callback_fn_t cbfunc,
|
|
|
|
void *cbdata)
|
2012-10-25 17:15:17 +00:00
|
|
|
{
|
|
|
|
orte_dfs_request_t *dfs;
|
|
|
|
|
2013-03-27 21:14:43 +00:00
|
|
|
opal_output_verbose(1, orte_dfs_base_framework.framework_output,
|
2012-10-25 17:15:17 +00:00
|
|
|
"%s seek called on fd %d",
|
|
|
|
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), fd);
|
|
|
|
|
|
|
|
dfs = OBJ_NEW(orte_dfs_request_t);
|
|
|
|
dfs->cmd = ORTE_DFS_SEEK_CMD;
|
|
|
|
dfs->local_fd = fd;
|
|
|
|
dfs->read_length = offset;
|
2012-10-25 22:23:08 +00:00
|
|
|
dfs->remote_fd = whence;
|
2012-10-26 15:49:04 +00:00
|
|
|
dfs->seek_cbfunc = cbfunc;
|
|
|
|
dfs->cbdata = cbdata;
|
2012-10-25 17:15:17 +00:00
|
|
|
|
|
|
|
/* post it for processing */
|
|
|
|
ORTE_DFS_POST_REQUEST(dfs, process_seeks);
|
|
|
|
}
|
|
|
|
|
|
|
|
static void process_reads(int fd, short args, void *cbdata)
|
|
|
|
{
|
|
|
|
orte_dfs_request_t *read_dfs = (orte_dfs_request_t*)cbdata;
|
|
|
|
orte_dfs_tracker_t *tptr, *trk;
|
|
|
|
long nbytes;
|
|
|
|
opal_list_item_t *item;
|
|
|
|
opal_buffer_t *buffer;
|
|
|
|
int64_t i64;
|
|
|
|
int rc;
|
|
|
|
|
|
|
|
/* look in our local records for this fd */
|
|
|
|
trk = NULL;
|
|
|
|
for (item = opal_list_get_first(&active_files);
|
|
|
|
item != opal_list_get_end(&active_files);
|
|
|
|
item = opal_list_get_next(item)) {
|
|
|
|
tptr = (orte_dfs_tracker_t*)item;
|
|
|
|
if (tptr->local_fd == read_dfs->local_fd) {
|
|
|
|
trk = tptr;
|
|
|
|
break;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
if (NULL == trk) {
|
|
|
|
ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND);
|
|
|
|
OBJ_RELEASE(read_dfs);
|
|
|
|
return;
|
|
|
|
}
|
|
|
|
|
|
|
|
/* if the file is local, read the desired bytes */
|
|
|
|
if (trk->host_daemon.vpid == ORTE_PROC_MY_DAEMON->vpid) {
|
|
|
|
nbytes = read(trk->remote_fd, read_dfs->read_buffer, read_dfs->read_length);
|
2012-10-26 15:49:04 +00:00
|
|
|
if (0 < nbytes) {
|
|
|
|
/* update our location */
|
|
|
|
trk->location += nbytes;
|
|
|
|
}
|
2012-10-25 17:15:17 +00:00
|
|
|
/* pass them back to the caller */
|
|
|
|
if (NULL != read_dfs->read_cbfunc) {
|
|
|
|
read_dfs->read_cbfunc(nbytes, read_dfs->read_buffer, read_dfs->cbdata);
|
|
|
|
}
|
|
|
|
/* request is complete */
|
|
|
|
OBJ_RELEASE(read_dfs);
|
|
|
|
return;
|
|
|
|
}
|
|
|
|
/* add this request to our pending list */
|
|
|
|
read_dfs->id = req_id++;
|
|
|
|
opal_list_append(&requests, &read_dfs->super);
|
|
|
|
|
|
|
|
/* setup a message for the daemon telling
|
|
|
|
* them what file to read
|
|
|
|
*/
|
|
|
|
buffer = OBJ_NEW(opal_buffer_t);
|
|
|
|
if (OPAL_SUCCESS != (rc = opal_dss.pack(buffer, &read_dfs->cmd, 1, ORTE_DFS_CMD_T))) {
|
|
|
|
ORTE_ERROR_LOG(rc);
|
|
|
|
goto complete;
|
|
|
|
}
|
|
|
|
/* include the request id */
|
|
|
|
if (OPAL_SUCCESS != (rc = opal_dss.pack(buffer, &read_dfs->id, 1, OPAL_UINT64))) {
|
|
|
|
ORTE_ERROR_LOG(rc);
|
|
|
|
goto complete;
|
|
|
|
}
|
|
|
|
if (OPAL_SUCCESS != (rc = opal_dss.pack(buffer, &trk->remote_fd, 1, OPAL_INT))) {
|
|
|
|
ORTE_ERROR_LOG(rc);
|
|
|
|
goto complete;
|
|
|
|
}
|
|
|
|
i64 = (int64_t)read_dfs->read_length;
|
|
|
|
if (OPAL_SUCCESS != (rc = opal_dss.pack(buffer, &i64, 1, OPAL_INT64))) {
|
|
|
|
ORTE_ERROR_LOG(rc);
|
|
|
|
goto complete;
|
|
|
|
}
|
|
|
|
|
2013-03-27 21:14:43 +00:00
|
|
|
opal_output_verbose(1, orte_dfs_base_framework.framework_output,
|
2012-10-25 17:15:17 +00:00
|
|
|
"%s sending read file request to %s for fd %d",
|
|
|
|
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
|
|
|
ORTE_NAME_PRINT(&trk->host_daemon),
|
|
|
|
trk->local_fd);
|
|
|
|
/* send it */
|
|
|
|
if (0 > (rc = orte_rml.send_buffer_nb(&trk->host_daemon, buffer,
|
As per the RFC, bring in the ORTE async progress code and the rewrite of OOB:
*** THIS RFC INCLUDES A MINOR CHANGE TO THE MPI-RTE INTERFACE ***
Note: during the course of this work, it was necessary to completely separate the MPI and RTE progress engines. There were multiple places in the MPI layer where ORTE_WAIT_FOR_COMPLETION was being used. A new OMPI_WAIT_FOR_COMPLETION macro was created (defined in ompi/mca/rte/rte.h) that simply cycles across opal_progress until the provided flag becomes false. Places where the MPI layer blocked waiting for RTE to complete an event have been modified to use this macro.
***************************************************************************************
I am reissuing this RFC because of the time that has passed since its original release. Since its initial release and review, I have debugged it further to ensure it fully supports tests like loop_spawn. It therefore seems ready for merge back to the trunk. Given its prior review, I have set the timeout for one week.
The code is in https://bitbucket.org/rhc/ompi-oob2
WHAT: Rewrite of ORTE OOB
WHY: Support asynchronous progress and a host of other features
WHEN: Wed, August 21
SYNOPSIS:
The current OOB has served us well, but a number of limitations have been identified over the years. Specifically:
* it is only progressed when called via opal_progress, which can lead to hangs or recursive calls into libevent (which is not supported by that code)
* we've had issues when multiple NICs are available as the code doesn't "shift" messages between transports - thus, all nodes had to be available via the same TCP interface.
* the OOB "unloads" incoming opal_buffer_t objects during the transmission, thus preventing use of OBJ_RETAIN in the code when repeatedly sending the same message to multiple recipients
* there is no failover mechanism across NICs - if the selected NIC (or its attached switch) fails, we are forced to abort
* only one transport (i.e., component) can be "active"
The revised OOB resolves these problems:
* async progress is used for all application processes, with the progress thread blocking in the event library
* each available TCP NIC is supported by its own TCP module. The ability to asynchronously progress each module independently is provided, but not enabled by default (a runtime MCA parameter turns it "on")
* multi-address TCP NICs (e.g., a NIC with both an IPv4 and IPv6 address, or with virtual interfaces) are supported - reachability is determined by comparing the contact info for a peer against all addresses within the range covered by the address/mask pairs for the NIC.
* a message that arrives on one TCP NIC is automatically shifted to whatever NIC that is connected to the next "hop" if that peer cannot be reached by the incoming NIC. If no TCP module will reach the peer, then the OOB attempts to send the message via all other available components - if none can reach the peer, then an "error" is reported back to the RML, which then calls the errmgr for instructions.
* opal_buffer_t now conforms to standard object rules re OBJ_RETAIN as we no longer "unload" the incoming object
* NIC failure is reported to the TCP component, which then tries to resend the message across any other available TCP NIC. If that doesn't work, then the message is given back to the OOB base to try using other components. If all that fails, then the error is reported to the RML, which reports to the errmgr for instructions
* obviously from the above, multiple OOB components (e.g., TCP and UD) can be active in parallel
* the matching code has been moved to the RML (and out of the OOB/TCP component) so it is independent of transport
* routing is done by the individual OOB modules (as opposed to the RML). Thus, both routed and non-routed transports can simultaneously be active
* all blocking send/recv APIs have been removed. Everything operates asynchronously.
KNOWN LIMITATIONS:
* although provision is made for component failover as described above, the code for doing so has not been fully implemented yet. At the moment, if all connections for a given peer fail, the errmgr is notified of a "lost connection", which by default results in termination of the job if it was a lifeline
* the IPv6 code is present and compiles, but is not complete. Since the current IPv6 support in the OOB doesn't work anyway, I don't consider this a blocker
* routing is performed at the individual module level, yet the active routed component is selected on a global basis. We probably should update that to reflect that different transports may need/choose to route in different ways
* obviously, not every error path has been tested nor necessarily covered
* determining abnormal termination is more challenging than in the old code as we now potentially have multiple ways of connecting to a process. Ideally, we would declare "connection failed" when *all* transports can no longer reach the process, but that requires some additional (possibly complex) code. For now, the code replicates the old behavior only somewhat modified - i.e., if a module sees its connection fail, it checks to see if it is a lifeline. If so, it notifies the errmgr that the lifeline is lost - otherwise, it notifies the errmgr that a non-lifeline connection was lost.
* reachability is determined solely on the basis of a shared subnet address/mask - more sophisticated algorithms (e.g., the one used in the tcp btl) are required to handle routing via gateways
* the RML needs to assign sequence numbers to each message on a per-peer basis. The receiving RML will then deliver messages in order, thus preventing out-of-order messaging in the case where messages travel across different transports or a message needs to be redirected/resent due to failure of a NIC
This commit was SVN r29058.
2013-08-22 16:37:40 +00:00
|
|
|
ORTE_RML_TAG_DFS_CMD,
|
2012-10-25 17:15:17 +00:00
|
|
|
orte_rml_send_callback, NULL))) {
|
|
|
|
ORTE_ERROR_LOG(rc);
|
|
|
|
OBJ_RELEASE(buffer);
|
|
|
|
}
|
|
|
|
/* don't release the request */
|
|
|
|
return;
|
|
|
|
|
|
|
|
complete:
|
|
|
|
/* don't need to hang on to this request */
|
|
|
|
opal_list_remove_item(&requests, &read_dfs->super);
|
|
|
|
OBJ_RELEASE(read_dfs);
|
|
|
|
}
|
|
|
|
|
|
|
|
static void dfs_read(int fd, uint8_t *buffer,
|
|
|
|
long length,
|
|
|
|
orte_dfs_read_callback_fn_t cbfunc,
|
|
|
|
void *cbdata)
|
|
|
|
{
|
|
|
|
orte_dfs_request_t *dfs;
|
|
|
|
|
|
|
|
dfs = OBJ_NEW(orte_dfs_request_t);
|
|
|
|
dfs->cmd = ORTE_DFS_READ_CMD;
|
|
|
|
dfs->local_fd = fd;
|
|
|
|
dfs->read_buffer = buffer;
|
|
|
|
dfs->read_length = length;
|
|
|
|
dfs->read_cbfunc = cbfunc;
|
|
|
|
dfs->cbdata = cbdata;
|
|
|
|
|
|
|
|
/* post it for processing */
|
|
|
|
ORTE_DFS_POST_REQUEST(dfs, process_reads);
|
|
|
|
}
|
2012-10-29 23:05:45 +00:00
|
|
|
|
|
|
|
static void process_posts(int fd, short args, void *cbdata)
|
|
|
|
{
|
|
|
|
orte_dfs_request_t *dfs = (orte_dfs_request_t*)cbdata;
|
|
|
|
opal_buffer_t *buffer;
|
|
|
|
int rc;
|
|
|
|
|
|
|
|
/* we will get confirmation in our receive function, so
|
|
|
|
* add this request to our list */
|
|
|
|
dfs->id = req_id++;
|
|
|
|
opal_list_append(&requests, &dfs->super);
|
|
|
|
|
2012-11-10 14:09:12 +00:00
|
|
|
/* Send the buffer's contents to our local daemon for storage */
|
2012-10-29 23:05:45 +00:00
|
|
|
buffer = OBJ_NEW(opal_buffer_t);
|
|
|
|
if (OPAL_SUCCESS != (rc = opal_dss.pack(buffer, &dfs->cmd, 1, ORTE_DFS_CMD_T))) {
|
|
|
|
ORTE_ERROR_LOG(rc);
|
|
|
|
goto error;
|
|
|
|
}
|
|
|
|
/* include the request id */
|
|
|
|
if (OPAL_SUCCESS != (rc = opal_dss.pack(buffer, &dfs->id, 1, OPAL_UINT64))) {
|
|
|
|
ORTE_ERROR_LOG(rc);
|
|
|
|
goto error;
|
|
|
|
}
|
|
|
|
/* add my name */
|
|
|
|
if (OPAL_SUCCESS != (rc = opal_dss.pack(buffer, ORTE_PROC_MY_NAME, 1, ORTE_NAME))) {
|
|
|
|
ORTE_ERROR_LOG(rc);
|
|
|
|
goto error;
|
|
|
|
}
|
2012-11-10 14:09:12 +00:00
|
|
|
/* pack the payload */
|
|
|
|
if (OPAL_SUCCESS != (rc = opal_dss.pack(buffer, &dfs->bptr, 1, OPAL_BUFFER))) {
|
2012-10-29 23:05:45 +00:00
|
|
|
ORTE_ERROR_LOG(rc);
|
|
|
|
goto error;
|
|
|
|
}
|
|
|
|
/* send it */
|
|
|
|
if (0 > (rc = orte_rml.send_buffer_nb(ORTE_PROC_MY_DAEMON, buffer,
|
As per the RFC, bring in the ORTE async progress code and the rewrite of OOB:
*** THIS RFC INCLUDES A MINOR CHANGE TO THE MPI-RTE INTERFACE ***
Note: during the course of this work, it was necessary to completely separate the MPI and RTE progress engines. There were multiple places in the MPI layer where ORTE_WAIT_FOR_COMPLETION was being used. A new OMPI_WAIT_FOR_COMPLETION macro was created (defined in ompi/mca/rte/rte.h) that simply cycles across opal_progress until the provided flag becomes false. Places where the MPI layer blocked waiting for RTE to complete an event have been modified to use this macro.
***************************************************************************************
I am reissuing this RFC because of the time that has passed since its original release. Since its initial release and review, I have debugged it further to ensure it fully supports tests like loop_spawn. It therefore seems ready for merge back to the trunk. Given its prior review, I have set the timeout for one week.
The code is in https://bitbucket.org/rhc/ompi-oob2
WHAT: Rewrite of ORTE OOB
WHY: Support asynchronous progress and a host of other features
WHEN: Wed, August 21
SYNOPSIS:
The current OOB has served us well, but a number of limitations have been identified over the years. Specifically:
* it is only progressed when called via opal_progress, which can lead to hangs or recursive calls into libevent (which is not supported by that code)
* we've had issues when multiple NICs are available as the code doesn't "shift" messages between transports - thus, all nodes had to be available via the same TCP interface.
* the OOB "unloads" incoming opal_buffer_t objects during the transmission, thus preventing use of OBJ_RETAIN in the code when repeatedly sending the same message to multiple recipients
* there is no failover mechanism across NICs - if the selected NIC (or its attached switch) fails, we are forced to abort
* only one transport (i.e., component) can be "active"
The revised OOB resolves these problems:
* async progress is used for all application processes, with the progress thread blocking in the event library
* each available TCP NIC is supported by its own TCP module. The ability to asynchronously progress each module independently is provided, but not enabled by default (a runtime MCA parameter turns it "on")
* multi-address TCP NICs (e.g., a NIC with both an IPv4 and IPv6 address, or with virtual interfaces) are supported - reachability is determined by comparing the contact info for a peer against all addresses within the range covered by the address/mask pairs for the NIC.
* a message that arrives on one TCP NIC is automatically shifted to whatever NIC that is connected to the next "hop" if that peer cannot be reached by the incoming NIC. If no TCP module will reach the peer, then the OOB attempts to send the message via all other available components - if none can reach the peer, then an "error" is reported back to the RML, which then calls the errmgr for instructions.
* opal_buffer_t now conforms to standard object rules re OBJ_RETAIN as we no longer "unload" the incoming object
* NIC failure is reported to the TCP component, which then tries to resend the message across any other available TCP NIC. If that doesn't work, then the message is given back to the OOB base to try using other components. If all that fails, then the error is reported to the RML, which reports to the errmgr for instructions
* obviously from the above, multiple OOB components (e.g., TCP and UD) can be active in parallel
* the matching code has been moved to the RML (and out of the OOB/TCP component) so it is independent of transport
* routing is done by the individual OOB modules (as opposed to the RML). Thus, both routed and non-routed transports can simultaneously be active
* all blocking send/recv APIs have been removed. Everything operates asynchronously.
KNOWN LIMITATIONS:
* although provision is made for component failover as described above, the code for doing so has not been fully implemented yet. At the moment, if all connections for a given peer fail, the errmgr is notified of a "lost connection", which by default results in termination of the job if it was a lifeline
* the IPv6 code is present and compiles, but is not complete. Since the current IPv6 support in the OOB doesn't work anyway, I don't consider this a blocker
* routing is performed at the individual module level, yet the active routed component is selected on a global basis. We probably should update that to reflect that different transports may need/choose to route in different ways
* obviously, not every error path has been tested nor necessarily covered
* determining abnormal termination is more challenging than in the old code as we now potentially have multiple ways of connecting to a process. Ideally, we would declare "connection failed" when *all* transports can no longer reach the process, but that requires some additional (possibly complex) code. For now, the code replicates the old behavior only somewhat modified - i.e., if a module sees its connection fail, it checks to see if it is a lifeline. If so, it notifies the errmgr that the lifeline is lost - otherwise, it notifies the errmgr that a non-lifeline connection was lost.
* reachability is determined solely on the basis of a shared subnet address/mask - more sophisticated algorithms (e.g., the one used in the tcp btl) are required to handle routing via gateways
* the RML needs to assign sequence numbers to each message on a per-peer basis. The receiving RML will then deliver messages in order, thus preventing out-of-order messaging in the case where messages travel across different transports or a message needs to be redirected/resent due to failure of a NIC
This commit was SVN r29058.
2013-08-22 16:37:40 +00:00
|
|
|
ORTE_RML_TAG_DFS_CMD,
|
2012-10-29 23:05:45 +00:00
|
|
|
orte_rml_send_callback, NULL))) {
|
|
|
|
ORTE_ERROR_LOG(rc);
|
|
|
|
goto error;
|
|
|
|
}
|
|
|
|
return;
|
|
|
|
|
|
|
|
error:
|
|
|
|
OBJ_RELEASE(buffer);
|
|
|
|
opal_list_remove_item(&requests, &dfs->super);
|
|
|
|
if (NULL != dfs->post_cbfunc) {
|
|
|
|
dfs->post_cbfunc(dfs->cbdata);
|
|
|
|
}
|
|
|
|
OBJ_RELEASE(dfs);
|
|
|
|
}
|
|
|
|
|
2012-11-10 14:09:12 +00:00
|
|
|
static void dfs_post_file_map(opal_buffer_t *bo,
|
2012-10-29 23:05:45 +00:00
|
|
|
orte_dfs_post_callback_fn_t cbfunc,
|
|
|
|
void *cbdata)
|
|
|
|
{
|
|
|
|
orte_dfs_request_t *dfs;
|
|
|
|
|
|
|
|
dfs = OBJ_NEW(orte_dfs_request_t);
|
|
|
|
dfs->cmd = ORTE_DFS_POST_CMD;
|
2012-11-10 14:09:12 +00:00
|
|
|
dfs->bptr = bo;
|
2012-10-29 23:05:45 +00:00
|
|
|
dfs->post_cbfunc = cbfunc;
|
|
|
|
dfs->cbdata = cbdata;
|
|
|
|
|
|
|
|
/* post it for processing */
|
|
|
|
ORTE_DFS_POST_REQUEST(dfs, process_posts);
|
|
|
|
}
|
|
|
|
|
|
|
|
static void process_getfm(int fd, short args, void *cbdata)
|
|
|
|
{
|
|
|
|
orte_dfs_request_t *dfs = (orte_dfs_request_t*)cbdata;
|
|
|
|
opal_buffer_t *buffer;
|
|
|
|
int rc;
|
|
|
|
|
|
|
|
/* we will get confirmation in our receive function, so
|
|
|
|
* add this request to our list */
|
|
|
|
dfs->id = req_id++;
|
|
|
|
opal_list_append(&requests, &dfs->super);
|
|
|
|
|
|
|
|
/* Send the request to our local daemon */
|
|
|
|
buffer = OBJ_NEW(opal_buffer_t);
|
|
|
|
if (OPAL_SUCCESS != (rc = opal_dss.pack(buffer, &dfs->cmd, 1, ORTE_DFS_CMD_T))) {
|
|
|
|
ORTE_ERROR_LOG(rc);
|
|
|
|
goto error;
|
|
|
|
}
|
|
|
|
/* include the request id */
|
|
|
|
if (OPAL_SUCCESS != (rc = opal_dss.pack(buffer, &dfs->id, 1, OPAL_UINT64))) {
|
|
|
|
ORTE_ERROR_LOG(rc);
|
|
|
|
goto error;
|
|
|
|
}
|
|
|
|
/* and the target */
|
|
|
|
if (OPAL_SUCCESS != (rc = opal_dss.pack(buffer, &dfs->target, 1, ORTE_NAME))) {
|
|
|
|
ORTE_ERROR_LOG(rc);
|
|
|
|
goto error;
|
|
|
|
}
|
|
|
|
/* send it */
|
|
|
|
if (0 > (rc = orte_rml.send_buffer_nb(ORTE_PROC_MY_DAEMON, buffer,
|
As per the RFC, bring in the ORTE async progress code and the rewrite of OOB:
*** THIS RFC INCLUDES A MINOR CHANGE TO THE MPI-RTE INTERFACE ***
Note: during the course of this work, it was necessary to completely separate the MPI and RTE progress engines. There were multiple places in the MPI layer where ORTE_WAIT_FOR_COMPLETION was being used. A new OMPI_WAIT_FOR_COMPLETION macro was created (defined in ompi/mca/rte/rte.h) that simply cycles across opal_progress until the provided flag becomes false. Places where the MPI layer blocked waiting for RTE to complete an event have been modified to use this macro.
***************************************************************************************
I am reissuing this RFC because of the time that has passed since its original release. Since its initial release and review, I have debugged it further to ensure it fully supports tests like loop_spawn. It therefore seems ready for merge back to the trunk. Given its prior review, I have set the timeout for one week.
The code is in https://bitbucket.org/rhc/ompi-oob2
WHAT: Rewrite of ORTE OOB
WHY: Support asynchronous progress and a host of other features
WHEN: Wed, August 21
SYNOPSIS:
The current OOB has served us well, but a number of limitations have been identified over the years. Specifically:
* it is only progressed when called via opal_progress, which can lead to hangs or recursive calls into libevent (which is not supported by that code)
* we've had issues when multiple NICs are available as the code doesn't "shift" messages between transports - thus, all nodes had to be available via the same TCP interface.
* the OOB "unloads" incoming opal_buffer_t objects during the transmission, thus preventing use of OBJ_RETAIN in the code when repeatedly sending the same message to multiple recipients
* there is no failover mechanism across NICs - if the selected NIC (or its attached switch) fails, we are forced to abort
* only one transport (i.e., component) can be "active"
The revised OOB resolves these problems:
* async progress is used for all application processes, with the progress thread blocking in the event library
* each available TCP NIC is supported by its own TCP module. The ability to asynchronously progress each module independently is provided, but not enabled by default (a runtime MCA parameter turns it "on")
* multi-address TCP NICs (e.g., a NIC with both an IPv4 and IPv6 address, or with virtual interfaces) are supported - reachability is determined by comparing the contact info for a peer against all addresses within the range covered by the address/mask pairs for the NIC.
* a message that arrives on one TCP NIC is automatically shifted to whatever NIC that is connected to the next "hop" if that peer cannot be reached by the incoming NIC. If no TCP module will reach the peer, then the OOB attempts to send the message via all other available components - if none can reach the peer, then an "error" is reported back to the RML, which then calls the errmgr for instructions.
* opal_buffer_t now conforms to standard object rules re OBJ_RETAIN as we no longer "unload" the incoming object
* NIC failure is reported to the TCP component, which then tries to resend the message across any other available TCP NIC. If that doesn't work, then the message is given back to the OOB base to try using other components. If all that fails, then the error is reported to the RML, which reports to the errmgr for instructions
* obviously from the above, multiple OOB components (e.g., TCP and UD) can be active in parallel
* the matching code has been moved to the RML (and out of the OOB/TCP component) so it is independent of transport
* routing is done by the individual OOB modules (as opposed to the RML). Thus, both routed and non-routed transports can simultaneously be active
* all blocking send/recv APIs have been removed. Everything operates asynchronously.
KNOWN LIMITATIONS:
* although provision is made for component failover as described above, the code for doing so has not been fully implemented yet. At the moment, if all connections for a given peer fail, the errmgr is notified of a "lost connection", which by default results in termination of the job if it was a lifeline
* the IPv6 code is present and compiles, but is not complete. Since the current IPv6 support in the OOB doesn't work anyway, I don't consider this a blocker
* routing is performed at the individual module level, yet the active routed component is selected on a global basis. We probably should update that to reflect that different transports may need/choose to route in different ways
* obviously, not every error path has been tested nor necessarily covered
* determining abnormal termination is more challenging than in the old code as we now potentially have multiple ways of connecting to a process. Ideally, we would declare "connection failed" when *all* transports can no longer reach the process, but that requires some additional (possibly complex) code. For now, the code replicates the old behavior only somewhat modified - i.e., if a module sees its connection fail, it checks to see if it is a lifeline. If so, it notifies the errmgr that the lifeline is lost - otherwise, it notifies the errmgr that a non-lifeline connection was lost.
* reachability is determined solely on the basis of a shared subnet address/mask - more sophisticated algorithms (e.g., the one used in the tcp btl) are required to handle routing via gateways
* the RML needs to assign sequence numbers to each message on a per-peer basis. The receiving RML will then deliver messages in order, thus preventing out-of-order messaging in the case where messages travel across different transports or a message needs to be redirected/resent due to failure of a NIC
This commit was SVN r29058.
2013-08-22 16:37:40 +00:00
|
|
|
ORTE_RML_TAG_DFS_CMD,
|
2012-10-29 23:05:45 +00:00
|
|
|
orte_rml_send_callback, NULL))) {
|
|
|
|
ORTE_ERROR_LOG(rc);
|
|
|
|
goto error;
|
|
|
|
}
|
|
|
|
return;
|
|
|
|
|
|
|
|
error:
|
|
|
|
OBJ_RELEASE(buffer);
|
|
|
|
opal_list_remove_item(&requests, &dfs->super);
|
|
|
|
if (NULL != dfs->fm_cbfunc) {
|
|
|
|
dfs->fm_cbfunc(NULL, dfs->cbdata);
|
|
|
|
}
|
|
|
|
OBJ_RELEASE(dfs);
|
|
|
|
}
|
|
|
|
|
|
|
|
static void dfs_get_file_map(orte_process_name_t *target,
|
|
|
|
orte_dfs_fm_callback_fn_t cbfunc,
|
|
|
|
void *cbdata)
|
|
|
|
{
|
|
|
|
orte_dfs_request_t *dfs;
|
|
|
|
|
|
|
|
dfs = OBJ_NEW(orte_dfs_request_t);
|
|
|
|
dfs->cmd = ORTE_DFS_GETFM_CMD;
|
|
|
|
dfs->target.jobid = target->jobid;
|
|
|
|
dfs->target.vpid = target->vpid;
|
|
|
|
dfs->fm_cbfunc = cbfunc;
|
|
|
|
dfs->cbdata = cbdata;
|
|
|
|
|
|
|
|
/* post it for processing */
|
|
|
|
ORTE_DFS_POST_REQUEST(dfs, process_getfm);
|
|
|
|
}
|
|
|
|
|
|
|
|
static void dfs_load_file_maps(orte_jobid_t jobid,
|
2012-11-10 14:09:12 +00:00
|
|
|
opal_buffer_t *bo,
|
2012-10-29 23:05:45 +00:00
|
|
|
orte_dfs_load_callback_fn_t cbfunc,
|
|
|
|
void *cbdata)
|
|
|
|
{
|
|
|
|
/* apps don't store file maps */
|
|
|
|
if (NULL != cbfunc) {
|
|
|
|
cbfunc(cbdata);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
static void dfs_purge_file_maps(orte_jobid_t jobid,
|
|
|
|
orte_dfs_purge_callback_fn_t cbfunc,
|
|
|
|
void *cbdata)
|
|
|
|
{
|
|
|
|
/* apps don't store file maps */
|
|
|
|
if (NULL != cbfunc) {
|
|
|
|
cbfunc(cbdata);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|