SSTORE: use dynamic buffers for rml.send and rml.recv
The sstore component was still using static buffers for send_buffer_nb(). This patch changes opal_buffer_t buffer; to opal_buffer_t *buffer; This commit was SVN r30485.
Этот коммит содержится в:
родитель
2900f24b67
Коммит
d5c1e33900
@ -439,30 +439,30 @@ static orte_sstore_central_app_snapshot_info_t *find_handle_info(orte_sstore_bas
|
||||
static int pull_handle_info(orte_sstore_central_app_snapshot_info_t *handle_info )
|
||||
{
|
||||
int ret, exit_status = ORTE_SUCCESS;
|
||||
opal_buffer_t buffer;
|
||||
opal_buffer_t *buffer = NULL;
|
||||
orte_sstore_central_cmd_flag_t command;
|
||||
orte_std_cntr_t count;
|
||||
orte_sstore_base_handle_t loc_id;
|
||||
|
||||
OBJ_CONSTRUCT(&buffer, opal_buffer_t);
|
||||
buffer = OBJ_NEW(opal_buffer_t);
|
||||
|
||||
/*
|
||||
* Ask the daemon to send us the info that we need
|
||||
*/
|
||||
command = ORTE_SSTORE_CENTRAL_PULL;
|
||||
if (ORTE_SUCCESS != (ret = opal_dss.pack(&buffer, &command, 1, ORTE_SSTORE_CENTRAL_CMD )) ) {
|
||||
if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &command, 1, ORTE_SSTORE_CENTRAL_CMD))) {
|
||||
ORTE_ERROR_LOG(ret);
|
||||
exit_status = ret;
|
||||
goto cleanup;
|
||||
}
|
||||
|
||||
if (ORTE_SUCCESS != (ret = opal_dss.pack(&buffer, &(handle_info->id), 1, ORTE_SSTORE_HANDLE )) ) {
|
||||
if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &(handle_info->id), 1, ORTE_SSTORE_HANDLE))) {
|
||||
ORTE_ERROR_LOG(ret);
|
||||
exit_status = ret;
|
||||
goto cleanup;
|
||||
}
|
||||
|
||||
if (ORTE_SUCCESS != (ret = orte_rml.send_buffer_nb(ORTE_PROC_MY_DAEMON, &buffer,
|
||||
if (ORTE_SUCCESS != (ret = orte_rml.send_buffer_nb(ORTE_PROC_MY_DAEMON, buffer,
|
||||
ORTE_RML_TAG_SSTORE_INTERNAL,
|
||||
orte_rml_send_callback, NULL))) {
|
||||
ORTE_ERROR_LOG(ret);
|
||||
@ -473,8 +473,7 @@ static int pull_handle_info(orte_sstore_central_app_snapshot_info_t *handle_info
|
||||
/*
|
||||
* Receive the response
|
||||
*/
|
||||
OBJ_DESTRUCT(&buffer);
|
||||
OBJ_CONSTRUCT(&buffer, opal_buffer_t);
|
||||
buffer = OBJ_NEW(opal_buffer_t);
|
||||
OPAL_OUTPUT_VERBOSE((10, mca_sstore_central_component.super.output_handle,
|
||||
"sstore:central:(app): pull() from %s -> %s",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
||||
@ -494,15 +493,17 @@ static int pull_handle_info(orte_sstore_central_app_snapshot_info_t *handle_info
|
||||
orte_rml.recv_buffer_nb(ORTE_PROC_MY_DAEMON, ORTE_RML_TAG_SSTORE_INTERNAL,
|
||||
0, orte_rml_recv_callback, NULL);
|
||||
|
||||
/* wait for completion */
|
||||
|
||||
count = 1;
|
||||
if (ORTE_SUCCESS != (ret = opal_dss.unpack(&buffer, &command, &count, ORTE_SSTORE_CENTRAL_CMD))) {
|
||||
if (ORTE_SUCCESS != (ret = opal_dss.unpack(buffer, &command, &count, ORTE_SSTORE_CENTRAL_CMD))) {
|
||||
ORTE_ERROR_LOG(ret);
|
||||
exit_status = ret;
|
||||
goto cleanup;
|
||||
}
|
||||
|
||||
count = 1;
|
||||
if (ORTE_SUCCESS != (ret = opal_dss.unpack(&buffer, &loc_id, &count, ORTE_SSTORE_HANDLE )) ) {
|
||||
if (ORTE_SUCCESS != (ret = opal_dss.unpack(buffer, &loc_id, &count, ORTE_SSTORE_HANDLE))) {
|
||||
ORTE_ERROR_LOG(ret);
|
||||
exit_status = ret;
|
||||
goto cleanup;
|
||||
@ -512,28 +513,28 @@ static int pull_handle_info(orte_sstore_central_app_snapshot_info_t *handle_info
|
||||
}
|
||||
|
||||
count = 1;
|
||||
if (ORTE_SUCCESS != (ret = opal_dss.unpack(&buffer, &(handle_info->seq_num), &count, OPAL_INT))) {
|
||||
if (ORTE_SUCCESS != (ret = opal_dss.unpack(buffer, &(handle_info->seq_num), &count, OPAL_INT))) {
|
||||
ORTE_ERROR_LOG(ret);
|
||||
exit_status = ret;
|
||||
goto cleanup;
|
||||
}
|
||||
|
||||
count = 1;
|
||||
if (ORTE_SUCCESS != (ret = opal_dss.unpack(&buffer, &(handle_info->global_ref_name), &count, OPAL_STRING))) {
|
||||
if (ORTE_SUCCESS != (ret = opal_dss.unpack(buffer, &(handle_info->global_ref_name), &count, OPAL_STRING))) {
|
||||
ORTE_ERROR_LOG(ret);
|
||||
exit_status = ret;
|
||||
goto cleanup;
|
||||
}
|
||||
|
||||
count = 1;
|
||||
if (ORTE_SUCCESS != (ret = opal_dss.unpack(&buffer, &(handle_info->local_location), &count, OPAL_STRING))) {
|
||||
if (ORTE_SUCCESS != (ret = opal_dss.unpack(buffer, &(handle_info->local_location), &count, OPAL_STRING))) {
|
||||
ORTE_ERROR_LOG(ret);
|
||||
exit_status = ret;
|
||||
goto cleanup;
|
||||
}
|
||||
|
||||
count = 1;
|
||||
if (ORTE_SUCCESS != (ret = opal_dss.unpack(&buffer, &(handle_info->metadata_filename), &count, OPAL_STRING))) {
|
||||
if (ORTE_SUCCESS != (ret = opal_dss.unpack(buffer, &(handle_info->metadata_filename), &count, OPAL_STRING))) {
|
||||
ORTE_ERROR_LOG(ret);
|
||||
exit_status = ret;
|
||||
goto cleanup;
|
||||
@ -548,7 +549,10 @@ static int pull_handle_info(orte_sstore_central_app_snapshot_info_t *handle_info
|
||||
handle_info->global_ref_name
|
||||
));
|
||||
cleanup:
|
||||
OBJ_DESTRUCT(&buffer);
|
||||
if (NULL != buffer) {
|
||||
OBJ_RELEASE(buffer);
|
||||
buffer = NULL;
|
||||
}
|
||||
|
||||
return exit_status;
|
||||
}
|
||||
@ -556,39 +560,39 @@ static int pull_handle_info(orte_sstore_central_app_snapshot_info_t *handle_info
|
||||
static int push_handle_info(orte_sstore_central_app_snapshot_info_t *handle_info )
|
||||
{
|
||||
int ret, exit_status = ORTE_SUCCESS;
|
||||
opal_buffer_t buffer;
|
||||
opal_buffer_t *buffer = NULL;
|
||||
orte_sstore_central_cmd_flag_t command;
|
||||
|
||||
OBJ_CONSTRUCT(&buffer, opal_buffer_t);
|
||||
buffer = OBJ_NEW(opal_buffer_t);
|
||||
|
||||
command = ORTE_SSTORE_CENTRAL_PUSH;
|
||||
if (ORTE_SUCCESS != (ret = opal_dss.pack(&buffer, &command, 1, ORTE_SSTORE_CENTRAL_CMD )) ) {
|
||||
if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &command, 1, ORTE_SSTORE_CENTRAL_CMD))) {
|
||||
ORTE_ERROR_LOG(ret);
|
||||
exit_status = ret;
|
||||
goto cleanup;
|
||||
}
|
||||
|
||||
if (ORTE_SUCCESS != (ret = opal_dss.pack(&buffer, &(handle_info->id), 1, ORTE_SSTORE_HANDLE )) ) {
|
||||
if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &(handle_info->id), 1, ORTE_SSTORE_HANDLE))) {
|
||||
ORTE_ERROR_LOG(ret);
|
||||
exit_status = ret;
|
||||
goto cleanup;
|
||||
}
|
||||
|
||||
if (ORTE_SUCCESS != (ret = opal_dss.pack(&buffer, &(handle_info->ckpt_skipped), 1, OPAL_BOOL )) ) {
|
||||
if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &(handle_info->ckpt_skipped), 1, OPAL_BOOL))) {
|
||||
ORTE_ERROR_LOG(ret);
|
||||
exit_status = ret;
|
||||
goto cleanup;
|
||||
}
|
||||
|
||||
if( !handle_info->ckpt_skipped ) {
|
||||
if (ORTE_SUCCESS != (ret = opal_dss.pack(&buffer, &(handle_info->crs_comp), 1, OPAL_STRING )) ) {
|
||||
if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &(handle_info->crs_comp), 1, OPAL_STRING))) {
|
||||
ORTE_ERROR_LOG(ret);
|
||||
exit_status = ret;
|
||||
goto cleanup;
|
||||
}
|
||||
}
|
||||
|
||||
if (ORTE_SUCCESS != (ret = orte_rml.send_buffer_nb(ORTE_PROC_MY_DAEMON, &buffer,
|
||||
if (ORTE_SUCCESS != (ret = orte_rml.send_buffer_nb(ORTE_PROC_MY_DAEMON, buffer,
|
||||
ORTE_RML_TAG_SSTORE_INTERNAL,
|
||||
orte_rml_send_callback, NULL))) {
|
||||
ORTE_ERROR_LOG(ret);
|
||||
@ -596,8 +600,14 @@ static int push_handle_info(orte_sstore_central_app_snapshot_info_t *handle_info
|
||||
goto cleanup;
|
||||
}
|
||||
|
||||
/* buffer should not be released here; the callback releases it */
|
||||
buffer = NULL;
|
||||
|
||||
cleanup:
|
||||
OBJ_DESTRUCT(&buffer);
|
||||
if (NULL != buffer) {
|
||||
OBJ_RELEASE(buffer);
|
||||
buffer = NULL;
|
||||
}
|
||||
|
||||
return exit_status;
|
||||
}
|
||||
|
@ -887,55 +887,59 @@ static void sstore_central_global_recv(int status,
|
||||
static int process_local_pull(orte_process_name_t* peer, opal_buffer_t* buffer, orte_sstore_central_global_snapshot_info_t *handle_info)
|
||||
{
|
||||
int ret, exit_status = ORTE_SUCCESS;
|
||||
opal_buffer_t loc_buffer;
|
||||
opal_buffer_t *loc_buffer = NULL;
|
||||
orte_sstore_central_cmd_flag_t command;
|
||||
|
||||
/*
|
||||
* Push back the requested information
|
||||
*/
|
||||
OBJ_CONSTRUCT(&loc_buffer, opal_buffer_t);
|
||||
loc_buffer = OBJ_NEW(opal_buffer_t);
|
||||
|
||||
command = ORTE_SSTORE_CENTRAL_PUSH;
|
||||
if (ORTE_SUCCESS != (ret = opal_dss.pack(&loc_buffer, &command, 1, ORTE_SSTORE_CENTRAL_CMD )) ) {
|
||||
if (ORTE_SUCCESS != (ret = opal_dss.pack(loc_buffer, &command, 1, ORTE_SSTORE_CENTRAL_CMD))) {
|
||||
ORTE_ERROR_LOG(ret);
|
||||
exit_status = ret;
|
||||
goto cleanup;
|
||||
}
|
||||
|
||||
if (ORTE_SUCCESS != (ret = opal_dss.pack(&loc_buffer, &(handle_info->id), 1, ORTE_SSTORE_HANDLE )) ) {
|
||||
if (ORTE_SUCCESS != (ret = opal_dss.pack(loc_buffer, &(handle_info->id), 1, ORTE_SSTORE_HANDLE))) {
|
||||
ORTE_ERROR_LOG(ret);
|
||||
exit_status = ret;
|
||||
goto cleanup;
|
||||
}
|
||||
|
||||
if (ORTE_SUCCESS != (ret = opal_dss.pack(&loc_buffer, &(handle_info->seq_num), 1, OPAL_INT )) ) {
|
||||
if (ORTE_SUCCESS != (ret = opal_dss.pack(loc_buffer, &(handle_info->seq_num), 1, OPAL_INT))) {
|
||||
ORTE_ERROR_LOG(ret);
|
||||
exit_status = ret;
|
||||
goto cleanup;
|
||||
}
|
||||
|
||||
if (ORTE_SUCCESS != (ret = opal_dss.pack(&loc_buffer, &(handle_info->ref_name), 1, OPAL_STRING )) ) {
|
||||
if (ORTE_SUCCESS != (ret = opal_dss.pack(loc_buffer, &(handle_info->ref_name), 1, OPAL_STRING))) {
|
||||
ORTE_ERROR_LOG(ret);
|
||||
exit_status = ret;
|
||||
goto cleanup;
|
||||
}
|
||||
|
||||
if (ORTE_SUCCESS != (ret = opal_dss.pack(&loc_buffer, &(handle_info->app_location_fmt), 1, OPAL_STRING )) ) {
|
||||
if (ORTE_SUCCESS != (ret = opal_dss.pack(loc_buffer, &(handle_info->app_location_fmt), 1, OPAL_STRING))) {
|
||||
ORTE_ERROR_LOG(ret);
|
||||
exit_status = ret;
|
||||
goto cleanup;
|
||||
}
|
||||
|
||||
if (ORTE_SUCCESS != (ret = orte_rml.send_buffer_nb(peer, &loc_buffer,
|
||||
ORTE_RML_TAG_SSTORE_INTERNAL,
|
||||
if (ORTE_SUCCESS != (ret = orte_rml.send_buffer_nb(peer, loc_buffer, ORTE_RML_TAG_SSTORE_INTERNAL,
|
||||
orte_rml_send_callback, NULL))) {
|
||||
ORTE_ERROR_LOG(ret);
|
||||
exit_status = ret;
|
||||
goto cleanup;
|
||||
}
|
||||
/* loc_buffer should not be released here; the callback releases it */
|
||||
loc_buffer = NULL;
|
||||
|
||||
cleanup:
|
||||
OBJ_DESTRUCT(&loc_buffer);
|
||||
if (NULL != loc_buffer) {
|
||||
OBJ_RELEASE(loc_buffer);
|
||||
loc_buffer = NULL;
|
||||
}
|
||||
|
||||
return exit_status;
|
||||
}
|
||||
|
@ -709,7 +709,7 @@ static int process_global_push(orte_process_name_t* peer, opal_buffer_t* buffer,
|
||||
static int process_app_pull(orte_process_name_t* peer, opal_buffer_t* buffer, orte_sstore_central_local_snapshot_info_t *handle_info)
|
||||
{
|
||||
int ret, exit_status = ORTE_SUCCESS;
|
||||
opal_buffer_t loc_buffer;
|
||||
opal_buffer_t *loc_buffer = NULL;
|
||||
orte_sstore_central_cmd_flag_t command;
|
||||
orte_sstore_central_local_app_snapshot_info_t *app_info = NULL;
|
||||
|
||||
@ -721,55 +721,59 @@ static int process_app_pull(orte_process_name_t* peer, opal_buffer_t* buffer, or
|
||||
/*
|
||||
* Push back the requested information
|
||||
*/
|
||||
OBJ_CONSTRUCT(&loc_buffer, opal_buffer_t);
|
||||
loc_buffer = OBJ_NEW(opal_buffer_t);
|
||||
|
||||
command = ORTE_SSTORE_CENTRAL_PUSH;
|
||||
if (ORTE_SUCCESS != (ret = opal_dss.pack(&loc_buffer, &command, 1, ORTE_SSTORE_CENTRAL_CMD )) ) {
|
||||
if (ORTE_SUCCESS != (ret = opal_dss.pack(loc_buffer, &command, 1, ORTE_SSTORE_CENTRAL_CMD))) {
|
||||
ORTE_ERROR_LOG(ret);
|
||||
exit_status = ret;
|
||||
goto cleanup;
|
||||
}
|
||||
|
||||
if (ORTE_SUCCESS != (ret = opal_dss.pack(&loc_buffer, &(handle_info->id), 1, ORTE_SSTORE_HANDLE )) ) {
|
||||
if (ORTE_SUCCESS != (ret = opal_dss.pack(loc_buffer, &(handle_info->id), 1, ORTE_SSTORE_HANDLE))) {
|
||||
ORTE_ERROR_LOG(ret);
|
||||
exit_status = ret;
|
||||
goto cleanup;
|
||||
}
|
||||
|
||||
if (ORTE_SUCCESS != (ret = opal_dss.pack(&loc_buffer, &(handle_info->seq_num), 1, OPAL_INT )) ) {
|
||||
if (ORTE_SUCCESS != (ret = opal_dss.pack(loc_buffer, &(handle_info->seq_num), 1, OPAL_INT))) {
|
||||
ORTE_ERROR_LOG(ret);
|
||||
exit_status = ret;
|
||||
goto cleanup;
|
||||
}
|
||||
|
||||
if (ORTE_SUCCESS != (ret = opal_dss.pack(&loc_buffer, &(handle_info->global_ref_name), 1, OPAL_STRING )) ) {
|
||||
if (ORTE_SUCCESS != (ret = opal_dss.pack(loc_buffer, &(handle_info->global_ref_name), 1, OPAL_STRING))) {
|
||||
ORTE_ERROR_LOG(ret);
|
||||
exit_status = ret;
|
||||
goto cleanup;
|
||||
}
|
||||
|
||||
if (ORTE_SUCCESS != (ret = opal_dss.pack(&loc_buffer, &(app_info->local_location), 1, OPAL_STRING )) ) {
|
||||
if (ORTE_SUCCESS != (ret = opal_dss.pack(loc_buffer, &(app_info->local_location), 1, OPAL_STRING))) {
|
||||
ORTE_ERROR_LOG(ret);
|
||||
exit_status = ret;
|
||||
goto cleanup;
|
||||
}
|
||||
|
||||
if (ORTE_SUCCESS != (ret = opal_dss.pack(&loc_buffer, &(app_info->metadata_filename), 1, OPAL_STRING )) ) {
|
||||
if (ORTE_SUCCESS != (ret = opal_dss.pack(loc_buffer, &(app_info->metadata_filename), 1, OPAL_STRING))) {
|
||||
ORTE_ERROR_LOG(ret);
|
||||
exit_status = ret;
|
||||
goto cleanup;
|
||||
}
|
||||
|
||||
if (ORTE_SUCCESS != (ret = orte_rml.send_buffer_nb(peer, &loc_buffer,
|
||||
ORTE_RML_TAG_SSTORE_INTERNAL,
|
||||
if (ORTE_SUCCESS != (ret = orte_rml.send_buffer_nb(peer, loc_buffer, ORTE_RML_TAG_SSTORE_INTERNAL,
|
||||
orte_rml_send_callback, NULL))) {
|
||||
ORTE_ERROR_LOG(ret);
|
||||
exit_status = ret;
|
||||
goto cleanup;
|
||||
}
|
||||
/* loc_buffer should not be released here; the callback releases it */
|
||||
loc_buffer = NULL;
|
||||
|
||||
cleanup:
|
||||
OBJ_DESTRUCT(&loc_buffer);
|
||||
if (NULL != loc_buffer) {
|
||||
OBJ_RELEASE(loc_buffer);
|
||||
loc_buffer = NULL;
|
||||
}
|
||||
|
||||
return exit_status;
|
||||
}
|
||||
@ -847,7 +851,7 @@ static int wait_all_apps_updated(orte_sstore_central_local_snapshot_info_t *hand
|
||||
static int pull_handle_info(orte_sstore_central_local_snapshot_info_t *handle_info )
|
||||
{
|
||||
int ret, exit_status = ORTE_SUCCESS;
|
||||
opal_buffer_t buffer;
|
||||
opal_buffer_t *buffer = NULL;
|
||||
orte_sstore_central_cmd_flag_t command;
|
||||
|
||||
/*
|
||||
@ -861,25 +865,25 @@ static int pull_handle_info(orte_sstore_central_local_snapshot_info_t *handle_in
|
||||
return ORTE_SUCCESS;
|
||||
}
|
||||
|
||||
OBJ_CONSTRUCT(&buffer, opal_buffer_t);
|
||||
buffer = OBJ_NEW(opal_buffer_t);
|
||||
|
||||
/*
|
||||
* Ask the daemon to send us the info that we need
|
||||
*/
|
||||
command = ORTE_SSTORE_CENTRAL_PULL;
|
||||
if (ORTE_SUCCESS != (ret = opal_dss.pack(&buffer, &command, 1, ORTE_SSTORE_CENTRAL_CMD )) ) {
|
||||
if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &command, 1, ORTE_SSTORE_CENTRAL_CMD))) {
|
||||
ORTE_ERROR_LOG(ret);
|
||||
exit_status = ret;
|
||||
goto cleanup;
|
||||
}
|
||||
|
||||
if (ORTE_SUCCESS != (ret = opal_dss.pack(&buffer, &(handle_info->id), 1, ORTE_SSTORE_HANDLE )) ) {
|
||||
if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &(handle_info->id), 1, ORTE_SSTORE_HANDLE))) {
|
||||
ORTE_ERROR_LOG(ret);
|
||||
exit_status = ret;
|
||||
goto cleanup;
|
||||
}
|
||||
|
||||
if (ORTE_SUCCESS != (ret = orte_rml.send_buffer_nb(ORTE_PROC_MY_HNP, &buffer,
|
||||
if (ORTE_SUCCESS != (ret = orte_rml.send_buffer_nb(ORTE_PROC_MY_HNP, buffer,
|
||||
ORTE_RML_TAG_SSTORE_INTERNAL,
|
||||
orte_rml_send_callback, NULL))) {
|
||||
ORTE_ERROR_LOG(ret);
|
||||
@ -887,8 +891,14 @@ static int pull_handle_info(orte_sstore_central_local_snapshot_info_t *handle_in
|
||||
goto cleanup;
|
||||
}
|
||||
|
||||
/* buffer should not be released here; the callback releases it */
|
||||
buffer = NULL;
|
||||
|
||||
cleanup:
|
||||
OBJ_DESTRUCT(&buffer);
|
||||
if (NULL != buffer) {
|
||||
OBJ_RELEASE(buffer);
|
||||
buffer = NULL;
|
||||
}
|
||||
|
||||
return exit_status;
|
||||
}
|
||||
@ -896,29 +906,29 @@ static int pull_handle_info(orte_sstore_central_local_snapshot_info_t *handle_in
|
||||
static int push_handle_info(orte_sstore_central_local_snapshot_info_t *handle_info )
|
||||
{
|
||||
int ret, exit_status = ORTE_SUCCESS;
|
||||
opal_buffer_t buffer;
|
||||
opal_buffer_t *buffer = NULL;
|
||||
orte_sstore_central_cmd_flag_t command;
|
||||
orte_sstore_central_local_app_snapshot_info_t *app_info = NULL;
|
||||
opal_list_item_t *item = NULL;
|
||||
size_t list_size;
|
||||
|
||||
OBJ_CONSTRUCT(&buffer, opal_buffer_t);
|
||||
buffer = OBJ_NEW(opal_buffer_t);
|
||||
|
||||
command = ORTE_SSTORE_CENTRAL_PUSH;
|
||||
if (ORTE_SUCCESS != (ret = opal_dss.pack(&buffer, &command, 1, ORTE_SSTORE_CENTRAL_CMD )) ) {
|
||||
if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &command, 1, ORTE_SSTORE_CENTRAL_CMD))) {
|
||||
ORTE_ERROR_LOG(ret);
|
||||
exit_status = ret;
|
||||
goto cleanup;
|
||||
}
|
||||
|
||||
if (ORTE_SUCCESS != (ret = opal_dss.pack(&buffer, &(handle_info->id), 1, ORTE_SSTORE_HANDLE )) ) {
|
||||
if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &(handle_info->id), 1, ORTE_SSTORE_HANDLE))) {
|
||||
ORTE_ERROR_LOG(ret);
|
||||
exit_status = ret;
|
||||
goto cleanup;
|
||||
}
|
||||
|
||||
list_size = opal_list_get_size(handle_info->app_info_handle);
|
||||
if (ORTE_SUCCESS != (ret = opal_dss.pack(&buffer, &list_size, 1, OPAL_SIZE )) ) {
|
||||
if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &list_size, 1, OPAL_SIZE))) {
|
||||
ORTE_ERROR_LOG(ret);
|
||||
exit_status = ret;
|
||||
goto cleanup;
|
||||
@ -932,20 +942,20 @@ static int push_handle_info(orte_sstore_central_local_snapshot_info_t *handle_in
|
||||
item = opal_list_get_next(item) ) {
|
||||
app_info = (orte_sstore_central_local_app_snapshot_info_t*)item;
|
||||
|
||||
if (ORTE_SUCCESS != (ret = opal_dss.pack(&buffer, &(app_info->name), 1, ORTE_NAME )) ) {
|
||||
if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &(app_info->name), 1, ORTE_NAME))) {
|
||||
ORTE_ERROR_LOG(ret);
|
||||
exit_status = ret;
|
||||
goto cleanup;
|
||||
}
|
||||
|
||||
if (ORTE_SUCCESS != (ret = opal_dss.pack(&buffer, &(app_info->ckpt_skipped), 1, OPAL_BOOL )) ) {
|
||||
if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &(app_info->ckpt_skipped), 1, OPAL_BOOL))) {
|
||||
ORTE_ERROR_LOG(ret);
|
||||
exit_status = ret;
|
||||
goto cleanup;
|
||||
}
|
||||
|
||||
if( !app_info->ckpt_skipped ) {
|
||||
if (ORTE_SUCCESS != (ret = opal_dss.pack(&buffer, &(app_info->crs_comp), 1, OPAL_STRING )) ) {
|
||||
if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &(app_info->crs_comp), 1, OPAL_STRING))) {
|
||||
ORTE_ERROR_LOG(ret);
|
||||
exit_status = ret;
|
||||
goto cleanup;
|
||||
@ -953,7 +963,7 @@ static int push_handle_info(orte_sstore_central_local_snapshot_info_t *handle_in
|
||||
}
|
||||
}
|
||||
|
||||
if (ORTE_SUCCESS != (ret = orte_rml.send_buffer_nb(ORTE_PROC_MY_HNP, &buffer,
|
||||
if (ORTE_SUCCESS != (ret = orte_rml.send_buffer_nb(ORTE_PROC_MY_HNP, buffer,
|
||||
ORTE_RML_TAG_SSTORE_INTERNAL,
|
||||
orte_rml_send_callback, NULL))) {
|
||||
ORTE_ERROR_LOG(ret);
|
||||
@ -961,8 +971,14 @@ static int push_handle_info(orte_sstore_central_local_snapshot_info_t *handle_in
|
||||
goto cleanup;
|
||||
}
|
||||
|
||||
/* buffer should not be released here; the callback releases it */
|
||||
buffer = NULL;
|
||||
|
||||
cleanup:
|
||||
OBJ_DESTRUCT(&buffer);
|
||||
if (NULL != buffer) {
|
||||
OBJ_RELEASE(buffer);
|
||||
buffer = NULL;
|
||||
}
|
||||
|
||||
return exit_status;
|
||||
}
|
||||
|
@ -428,30 +428,30 @@ static orte_sstore_stage_app_snapshot_info_t *find_handle_info(orte_sstore_base_
|
||||
static int pull_handle_info(orte_sstore_stage_app_snapshot_info_t *handle_info )
|
||||
{
|
||||
int ret, exit_status = ORTE_SUCCESS;
|
||||
opal_buffer_t buffer;
|
||||
opal_buffer_t *buffer = NULL;
|
||||
orte_sstore_stage_cmd_flag_t command;
|
||||
orte_std_cntr_t count;
|
||||
orte_sstore_base_handle_t loc_id;
|
||||
|
||||
OBJ_CONSTRUCT(&buffer, opal_buffer_t);
|
||||
buffer = OBJ_NEW(opal_buffer_t);
|
||||
|
||||
/*
|
||||
* Ask the daemon to send us the info that we need
|
||||
*/
|
||||
command = ORTE_SSTORE_STAGE_PULL;
|
||||
if (ORTE_SUCCESS != (ret = opal_dss.pack(&buffer, &command, 1, ORTE_SSTORE_STAGE_CMD )) ) {
|
||||
if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &command, 1, ORTE_SSTORE_STAGE_CMD))) {
|
||||
ORTE_ERROR_LOG(ret);
|
||||
exit_status = ret;
|
||||
goto cleanup;
|
||||
}
|
||||
|
||||
if (ORTE_SUCCESS != (ret = opal_dss.pack(&buffer, &(handle_info->id), 1, ORTE_SSTORE_HANDLE )) ) {
|
||||
if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &(handle_info->id), 1, ORTE_SSTORE_HANDLE))) {
|
||||
ORTE_ERROR_LOG(ret);
|
||||
exit_status = ret;
|
||||
goto cleanup;
|
||||
}
|
||||
|
||||
if (ORTE_SUCCESS != (ret = orte_rml.send_buffer_nb(ORTE_PROC_MY_DAEMON, &buffer,
|
||||
if (ORTE_SUCCESS != (ret = orte_rml.send_buffer_nb(ORTE_PROC_MY_DAEMON, buffer,
|
||||
ORTE_RML_TAG_SSTORE_INTERNAL,
|
||||
orte_rml_send_callback, NULL))) {
|
||||
ORTE_ERROR_LOG(ret);
|
||||
@ -462,8 +462,7 @@ static int pull_handle_info(orte_sstore_stage_app_snapshot_info_t *handle_info )
|
||||
/*
|
||||
* Receive the response
|
||||
*/
|
||||
OBJ_DESTRUCT(&buffer);
|
||||
OBJ_CONSTRUCT(&buffer, opal_buffer_t);
|
||||
buffer = OBJ_NEW(opal_buffer_t);
|
||||
OPAL_OUTPUT_VERBOSE((10, mca_sstore_stage_component.super.output_handle,
|
||||
"sstore:stage:(app): pull() from %s -> %s",
|
||||
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
|
||||
@ -483,14 +482,14 @@ static int pull_handle_info(orte_sstore_stage_app_snapshot_info_t *handle_info )
|
||||
orte_rml.recv_buffer_nb(ORTE_PROC_MY_DAEMON, ORTE_RML_TAG_SSTORE_INTERNAL,
|
||||
0, orte_rml_recv_callback, NULL);
|
||||
count = 1;
|
||||
if (ORTE_SUCCESS != (ret = opal_dss.unpack(&buffer, &command, &count, ORTE_SSTORE_STAGE_CMD))) {
|
||||
if (ORTE_SUCCESS != (ret = opal_dss.unpack(buffer, &command, &count, ORTE_SSTORE_STAGE_CMD))) {
|
||||
ORTE_ERROR_LOG(ret);
|
||||
exit_status = ret;
|
||||
goto cleanup;
|
||||
}
|
||||
|
||||
count = 1;
|
||||
if (ORTE_SUCCESS != (ret = opal_dss.unpack(&buffer, &loc_id, &count, ORTE_SSTORE_HANDLE )) ) {
|
||||
if (ORTE_SUCCESS != (ret = opal_dss.unpack(buffer, &loc_id, &count, ORTE_SSTORE_HANDLE))) {
|
||||
ORTE_ERROR_LOG(ret);
|
||||
exit_status = ret;
|
||||
goto cleanup;
|
||||
@ -500,35 +499,38 @@ static int pull_handle_info(orte_sstore_stage_app_snapshot_info_t *handle_info )
|
||||
}
|
||||
|
||||
count = 1;
|
||||
if (ORTE_SUCCESS != (ret = opal_dss.unpack(&buffer, &(handle_info->seq_num), &count, OPAL_INT))) {
|
||||
if (ORTE_SUCCESS != (ret = opal_dss.unpack(buffer, &(handle_info->seq_num), &count, OPAL_INT))) {
|
||||
ORTE_ERROR_LOG(ret);
|
||||
exit_status = ret;
|
||||
goto cleanup;
|
||||
}
|
||||
|
||||
count = 1;
|
||||
if (ORTE_SUCCESS != (ret = opal_dss.unpack(&buffer, &(handle_info->global_ref_name), &count, OPAL_STRING))) {
|
||||
if (ORTE_SUCCESS != (ret = opal_dss.unpack(buffer, &(handle_info->global_ref_name), &count, OPAL_STRING))) {
|
||||
ORTE_ERROR_LOG(ret);
|
||||
exit_status = ret;
|
||||
goto cleanup;
|
||||
}
|
||||
|
||||
count = 1;
|
||||
if (ORTE_SUCCESS != (ret = opal_dss.unpack(&buffer, &(handle_info->local_location), &count, OPAL_STRING))) {
|
||||
if (ORTE_SUCCESS != (ret = opal_dss.unpack(buffer, &(handle_info->local_location), &count, OPAL_STRING))) {
|
||||
ORTE_ERROR_LOG(ret);
|
||||
exit_status = ret;
|
||||
goto cleanup;
|
||||
}
|
||||
|
||||
count = 1;
|
||||
if (ORTE_SUCCESS != (ret = opal_dss.unpack(&buffer, &(handle_info->metadata_filename), &count, OPAL_STRING))) {
|
||||
if (ORTE_SUCCESS != (ret = opal_dss.unpack(buffer, &(handle_info->metadata_filename), &count, OPAL_STRING))) {
|
||||
ORTE_ERROR_LOG(ret);
|
||||
exit_status = ret;
|
||||
goto cleanup;
|
||||
}
|
||||
|
||||
cleanup:
|
||||
OBJ_DESTRUCT(&buffer);
|
||||
if (NULL != buffer) {
|
||||
OBJ_RELEASE(buffer);
|
||||
buffer = NULL;
|
||||
}
|
||||
|
||||
return exit_status;
|
||||
}
|
||||
@ -536,48 +538,53 @@ static int pull_handle_info(orte_sstore_stage_app_snapshot_info_t *handle_info )
|
||||
static int push_handle_info(orte_sstore_stage_app_snapshot_info_t *handle_info )
|
||||
{
|
||||
int ret, exit_status = ORTE_SUCCESS;
|
||||
opal_buffer_t buffer;
|
||||
opal_buffer_t *buffer = NULL;
|
||||
orte_sstore_stage_cmd_flag_t command;
|
||||
|
||||
OBJ_CONSTRUCT(&buffer, opal_buffer_t);
|
||||
buffer = OBJ_NEW(opal_buffer_t);
|
||||
|
||||
command = ORTE_SSTORE_STAGE_PUSH;
|
||||
if (ORTE_SUCCESS != (ret = opal_dss.pack(&buffer, &command, 1, ORTE_SSTORE_STAGE_CMD )) ) {
|
||||
if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &command, 1, ORTE_SSTORE_STAGE_CMD))) {
|
||||
ORTE_ERROR_LOG(ret);
|
||||
exit_status = ret;
|
||||
goto cleanup;
|
||||
}
|
||||
|
||||
if (ORTE_SUCCESS != (ret = opal_dss.pack(&buffer, &(handle_info->id), 1, ORTE_SSTORE_HANDLE )) ) {
|
||||
if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &(handle_info->id), 1, ORTE_SSTORE_HANDLE))) {
|
||||
ORTE_ERROR_LOG(ret);
|
||||
exit_status = ret;
|
||||
goto cleanup;
|
||||
}
|
||||
|
||||
if (ORTE_SUCCESS != (ret = opal_dss.pack(&buffer, &(handle_info->ckpt_skipped), 1, OPAL_BOOL )) ) {
|
||||
if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &(handle_info->ckpt_skipped), 1, OPAL_BOOL))) {
|
||||
ORTE_ERROR_LOG(ret);
|
||||
exit_status = ret;
|
||||
goto cleanup;
|
||||
}
|
||||
|
||||
if( !handle_info->ckpt_skipped ) {
|
||||
if (ORTE_SUCCESS != (ret = opal_dss.pack(&buffer, &(handle_info->crs_comp), 1, OPAL_STRING )) ) {
|
||||
if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &(handle_info->crs_comp), 1, OPAL_STRING))) {
|
||||
ORTE_ERROR_LOG(ret);
|
||||
exit_status = ret;
|
||||
goto cleanup;
|
||||
}
|
||||
}
|
||||
|
||||
if (ORTE_SUCCESS != (ret = orte_rml.send_buffer_nb(ORTE_PROC_MY_DAEMON, &buffer,
|
||||
if (ORTE_SUCCESS != (ret = orte_rml.send_buffer_nb(ORTE_PROC_MY_DAEMON, buffer,
|
||||
ORTE_RML_TAG_SSTORE_INTERNAL,
|
||||
orte_rml_send_callback, NULL))) {
|
||||
ORTE_ERROR_LOG(ret);
|
||||
exit_status = ret;
|
||||
goto cleanup;
|
||||
}
|
||||
/* buffer should not be released here; the callback releases it */
|
||||
buffer = NULL;
|
||||
|
||||
cleanup:
|
||||
OBJ_DESTRUCT(&buffer);
|
||||
if (NULL != buffer) {
|
||||
OBJ_RELEASE(buffer);
|
||||
buffer = NULL;
|
||||
}
|
||||
|
||||
return exit_status;
|
||||
}
|
||||
|
@ -1098,69 +1098,73 @@ static void sstore_stage_global_recv(int status,
|
||||
static int process_local_pull(orte_process_name_t* peer, opal_buffer_t* buffer, orte_sstore_stage_global_snapshot_info_t *handle_info)
|
||||
{
|
||||
int ret, exit_status = ORTE_SUCCESS;
|
||||
opal_buffer_t loc_buffer;
|
||||
opal_buffer_t *loc_buffer = NULL;
|
||||
orte_sstore_stage_cmd_flag_t command;
|
||||
|
||||
/*
|
||||
* Push back the requested information
|
||||
*/
|
||||
OBJ_CONSTRUCT(&loc_buffer, opal_buffer_t);
|
||||
loc_buffer = OBJ_NEW(opal_buffer_t);
|
||||
|
||||
command = ORTE_SSTORE_STAGE_PUSH;
|
||||
if (ORTE_SUCCESS != (ret = opal_dss.pack(&loc_buffer, &command, 1, ORTE_SSTORE_STAGE_CMD )) ) {
|
||||
if (ORTE_SUCCESS != (ret = opal_dss.pack(loc_buffer, &command, 1, ORTE_SSTORE_STAGE_CMD))) {
|
||||
ORTE_ERROR_LOG(ret);
|
||||
exit_status = ret;
|
||||
goto cleanup;
|
||||
}
|
||||
|
||||
if (ORTE_SUCCESS != (ret = opal_dss.pack(&loc_buffer, &(handle_info->id), 1, ORTE_SSTORE_HANDLE )) ) {
|
||||
if (ORTE_SUCCESS != (ret = opal_dss.pack(loc_buffer, &(handle_info->id), 1, ORTE_SSTORE_HANDLE))) {
|
||||
ORTE_ERROR_LOG(ret);
|
||||
exit_status = ret;
|
||||
goto cleanup;
|
||||
}
|
||||
|
||||
if (ORTE_SUCCESS != (ret = opal_dss.pack(&loc_buffer, &(handle_info->seq_num), 1, OPAL_INT )) ) {
|
||||
if (ORTE_SUCCESS != (ret = opal_dss.pack(loc_buffer, &(handle_info->seq_num), 1, OPAL_INT))) {
|
||||
ORTE_ERROR_LOG(ret);
|
||||
exit_status = ret;
|
||||
goto cleanup;
|
||||
}
|
||||
|
||||
if (ORTE_SUCCESS != (ret = opal_dss.pack(&loc_buffer, &(handle_info->ref_name), 1, OPAL_STRING )) ) {
|
||||
if (ORTE_SUCCESS != (ret = opal_dss.pack(loc_buffer, &(handle_info->ref_name), 1, OPAL_STRING))) {
|
||||
ORTE_ERROR_LOG(ret);
|
||||
exit_status = ret;
|
||||
goto cleanup;
|
||||
}
|
||||
|
||||
if (ORTE_SUCCESS != (ret = opal_dss.pack(&loc_buffer, &(handle_info->app_local_location_fmt), 1, OPAL_STRING )) ) {
|
||||
if (ORTE_SUCCESS != (ret = opal_dss.pack(loc_buffer, &(handle_info->app_local_location_fmt), 1, OPAL_STRING))) {
|
||||
ORTE_ERROR_LOG(ret);
|
||||
exit_status = ret;
|
||||
goto cleanup;
|
||||
}
|
||||
|
||||
if( orte_sstore_stage_enabled_caching ) {
|
||||
if (ORTE_SUCCESS != (ret = opal_dss.pack(&loc_buffer, &(handle_info->app_local_cache_location_fmt), 1, OPAL_STRING )) ) {
|
||||
if (ORTE_SUCCESS != (ret = opal_dss.pack(loc_buffer, &(handle_info->app_local_cache_location_fmt), 1, OPAL_STRING))) {
|
||||
ORTE_ERROR_LOG(ret);
|
||||
exit_status = ret;
|
||||
goto cleanup;
|
||||
}
|
||||
}
|
||||
|
||||
if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &(handle_info->migrating), 1, OPAL_BOOL )) ) {
|
||||
if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &(handle_info->migrating), 1, OPAL_BOOL))) {
|
||||
ORTE_ERROR_LOG(ret);
|
||||
exit_status = ret;
|
||||
goto cleanup;
|
||||
}
|
||||
|
||||
if (ORTE_SUCCESS != (ret = orte_rml.send_buffer_nb(peer, &loc_buffer,
|
||||
ORTE_RML_TAG_SSTORE_INTERNAL,
|
||||
if (ORTE_SUCCESS != (ret = orte_rml.send_buffer_nb(peer, loc_buffer, ORTE_RML_TAG_SSTORE_INTERNAL,
|
||||
orte_rml_send_callback, NULL))) {
|
||||
ORTE_ERROR_LOG(ret);
|
||||
exit_status = ret;
|
||||
goto cleanup;
|
||||
}
|
||||
/* loc_buffer should not be released here; the callback releases it */
|
||||
loc_buffer = NULL;
|
||||
|
||||
cleanup:
|
||||
OBJ_DESTRUCT(&loc_buffer);
|
||||
if (NULL != loc_buffer) {
|
||||
OBJ_RELEASE(loc_buffer);
|
||||
loc_buffer = NULL;
|
||||
}
|
||||
|
||||
return exit_status;
|
||||
}
|
||||
@ -1515,34 +1519,40 @@ static int wait_all_filem(orte_sstore_stage_global_snapshot_info_t *handle_info)
|
||||
static int xcast_remove_all(orte_sstore_stage_global_snapshot_info_t *handle_info)
|
||||
{
|
||||
int ret, exit_status = ORTE_SUCCESS;
|
||||
opal_buffer_t loc_buffer;
|
||||
opal_buffer_t *loc_buffer = NULL;
|
||||
orte_sstore_stage_cmd_flag_t command;
|
||||
|
||||
handle_info->num_procs_done = 0;
|
||||
|
||||
OBJ_CONSTRUCT(&loc_buffer, opal_buffer_t);
|
||||
loc_buffer = OBJ_NEW(opal_buffer_t);
|
||||
|
||||
command = ORTE_SSTORE_STAGE_REMOVE;
|
||||
if (ORTE_SUCCESS != (ret = opal_dss.pack(&loc_buffer, &command, 1, ORTE_SSTORE_STAGE_CMD )) ) {
|
||||
if (ORTE_SUCCESS != (ret = opal_dss.pack(loc_buffer, &command, 1, ORTE_SSTORE_STAGE_CMD))) {
|
||||
ORTE_ERROR_LOG(ret);
|
||||
exit_status = ret;
|
||||
goto cleanup;
|
||||
}
|
||||
|
||||
if (ORTE_SUCCESS != (ret = opal_dss.pack(&loc_buffer, &(handle_info->id), 1, ORTE_SSTORE_HANDLE )) ) {
|
||||
if (ORTE_SUCCESS != (ret = opal_dss.pack(loc_buffer, &(handle_info->id), 1, ORTE_SSTORE_HANDLE))) {
|
||||
ORTE_ERROR_LOG(ret);
|
||||
exit_status = ret;
|
||||
goto cleanup;
|
||||
}
|
||||
|
||||
if( ORTE_SUCCESS != (ret = orte_grpcomm.xcast(ORTE_PROC_MY_NAME->jobid, &loc_buffer, ORTE_RML_TAG_SSTORE_INTERNAL)) ) {
|
||||
if( ORTE_SUCCESS != (ret = orte_grpcomm.xcast(ORTE_PROC_MY_NAME->jobid, loc_buffer, ORTE_RML_TAG_SSTORE_INTERNAL))) {
|
||||
ORTE_ERROR_LOG(ret);
|
||||
exit_status = ret;
|
||||
goto cleanup;
|
||||
}
|
||||
|
||||
/* loc_buffer should not be released here; the callback releases it */
|
||||
loc_buffer = NULL;
|
||||
|
||||
cleanup:
|
||||
OBJ_DESTRUCT(&loc_buffer);
|
||||
if (NULL != loc_buffer) {
|
||||
OBJ_RELEASE(loc_buffer);
|
||||
loc_buffer = NULL;
|
||||
}
|
||||
|
||||
return exit_status;
|
||||
}
|
||||
|
@ -1243,7 +1243,7 @@ static int process_global_remove(orte_process_name_t* peer, opal_buffer_t* buffe
|
||||
int ret, exit_status = ORTE_SUCCESS;
|
||||
orte_sstore_stage_local_app_snapshot_info_t *app_info = NULL;
|
||||
opal_list_item_t* item = NULL;
|
||||
opal_buffer_t loc_buffer;
|
||||
opal_buffer_t *loc_buffer = NULL;
|
||||
orte_sstore_stage_cmd_flag_t command;
|
||||
size_t list_size;
|
||||
char * cmd = NULL;
|
||||
@ -1288,30 +1288,29 @@ static int process_global_remove(orte_process_name_t* peer, opal_buffer_t* buffe
|
||||
}
|
||||
}
|
||||
|
||||
OBJ_CONSTRUCT(&loc_buffer, opal_buffer_t);
|
||||
loc_buffer = OBJ_NEW(opal_buffer_t);
|
||||
|
||||
command = ORTE_SSTORE_STAGE_DONE;
|
||||
if (ORTE_SUCCESS != (ret = opal_dss.pack(&loc_buffer, &command, 1, ORTE_SSTORE_STAGE_CMD )) ) {
|
||||
if (ORTE_SUCCESS != (ret = opal_dss.pack(loc_buffer, &command, 1, ORTE_SSTORE_STAGE_CMD))) {
|
||||
ORTE_ERROR_LOG(ret);
|
||||
exit_status = ret;
|
||||
goto cleanup;
|
||||
}
|
||||
|
||||
if (ORTE_SUCCESS != (ret = opal_dss.pack(&loc_buffer, &(handle_info->id), 1, ORTE_SSTORE_HANDLE )) ) {
|
||||
if (ORTE_SUCCESS != (ret = opal_dss.pack(loc_buffer, &(handle_info->id), 1, ORTE_SSTORE_HANDLE))) {
|
||||
ORTE_ERROR_LOG(ret);
|
||||
exit_status = ret;
|
||||
goto cleanup;
|
||||
}
|
||||
|
||||
list_size = opal_list_get_size(handle_info->app_info_handle);
|
||||
if (ORTE_SUCCESS != (ret = opal_dss.pack(&loc_buffer, &list_size, 1, OPAL_SIZE )) ) {
|
||||
if (ORTE_SUCCESS != (ret = opal_dss.pack(loc_buffer, &list_size, 1, OPAL_SIZE))) {
|
||||
ORTE_ERROR_LOG(ret);
|
||||
exit_status = ret;
|
||||
goto cleanup;
|
||||
}
|
||||
|
||||
if (ORTE_SUCCESS != (ret = orte_rml.send_buffer_nb(peer, &loc_buffer,
|
||||
ORTE_RML_TAG_SSTORE_INTERNAL,
|
||||
if (ORTE_SUCCESS != (ret = orte_rml.send_buffer_nb(peer, loc_buffer, ORTE_RML_TAG_SSTORE_INTERNAL,
|
||||
orte_rml_send_callback, NULL))) {
|
||||
ORTE_ERROR_LOG(ret);
|
||||
exit_status = ret;
|
||||
@ -1324,6 +1323,8 @@ static int process_global_remove(orte_process_name_t* peer, opal_buffer_t* buffe
|
||||
ORTE_NAME_PRINT(peer)));
|
||||
|
||||
handle_info->status = SSTORE_LOCAL_DONE;
|
||||
/* loc_buffer should not be released here; the callback releases it */
|
||||
loc_buffer = NULL;
|
||||
|
||||
cleanup:
|
||||
if( NULL != cmd ) {
|
||||
@ -1331,7 +1332,10 @@ static int process_global_remove(orte_process_name_t* peer, opal_buffer_t* buffe
|
||||
cmd = NULL;
|
||||
}
|
||||
|
||||
OBJ_DESTRUCT(&loc_buffer);
|
||||
if (NULL != loc_buffer) {
|
||||
OBJ_RELEASE(loc_buffer);
|
||||
loc_buffer = NULL;
|
||||
}
|
||||
|
||||
return exit_status;
|
||||
}
|
||||
@ -1339,7 +1343,7 @@ static int process_global_remove(orte_process_name_t* peer, opal_buffer_t* buffe
|
||||
static int process_app_pull(orte_process_name_t* peer, opal_buffer_t* buffer, orte_sstore_stage_local_snapshot_info_t *handle_info)
|
||||
{
|
||||
int ret, exit_status = ORTE_SUCCESS;
|
||||
opal_buffer_t loc_buffer;
|
||||
opal_buffer_t *loc_buffer = NULL;
|
||||
orte_sstore_stage_cmd_flag_t command;
|
||||
orte_sstore_stage_local_app_snapshot_info_t *app_info = NULL;
|
||||
|
||||
@ -1351,55 +1355,60 @@ static int process_app_pull(orte_process_name_t* peer, opal_buffer_t* buffer, or
|
||||
/*
|
||||
* Push back the requested information
|
||||
*/
|
||||
OBJ_CONSTRUCT(&loc_buffer, opal_buffer_t);
|
||||
loc_buffer = OBJ_NEW(opal_buffer_t);
|
||||
|
||||
command = ORTE_SSTORE_STAGE_PUSH;
|
||||
if (ORTE_SUCCESS != (ret = opal_dss.pack(&loc_buffer, &command, 1, ORTE_SSTORE_STAGE_CMD )) ) {
|
||||
if (ORTE_SUCCESS != (ret = opal_dss.pack(loc_buffer, &command, 1, ORTE_SSTORE_STAGE_CMD))) {
|
||||
ORTE_ERROR_LOG(ret);
|
||||
exit_status = ret;
|
||||
goto cleanup;
|
||||
}
|
||||
|
||||
if (ORTE_SUCCESS != (ret = opal_dss.pack(&loc_buffer, &(handle_info->id), 1, ORTE_SSTORE_HANDLE )) ) {
|
||||
if (ORTE_SUCCESS != (ret = opal_dss.pack(loc_buffer, &(handle_info->id), 1, ORTE_SSTORE_HANDLE))) {
|
||||
ORTE_ERROR_LOG(ret);
|
||||
exit_status = ret;
|
||||
goto cleanup;
|
||||
}
|
||||
|
||||
if (ORTE_SUCCESS != (ret = opal_dss.pack(&loc_buffer, &(handle_info->seq_num), 1, OPAL_INT )) ) {
|
||||
if (ORTE_SUCCESS != (ret = opal_dss.pack(loc_buffer, &(handle_info->seq_num), 1, OPAL_INT))) {
|
||||
ORTE_ERROR_LOG(ret);
|
||||
exit_status = ret;
|
||||
goto cleanup;
|
||||
}
|
||||
|
||||
if (ORTE_SUCCESS != (ret = opal_dss.pack(&loc_buffer, &(handle_info->global_ref_name), 1, OPAL_STRING )) ) {
|
||||
if (ORTE_SUCCESS != (ret = opal_dss.pack(loc_buffer, &(handle_info->global_ref_name), 1, OPAL_STRING))) {
|
||||
ORTE_ERROR_LOG(ret);
|
||||
exit_status = ret;
|
||||
goto cleanup;
|
||||
}
|
||||
|
||||
if (ORTE_SUCCESS != (ret = opal_dss.pack(&loc_buffer, &(app_info->local_location), 1, OPAL_STRING )) ) {
|
||||
if (ORTE_SUCCESS != (ret = opal_dss.pack(loc_buffer, &(app_info->local_location), 1, OPAL_STRING))) {
|
||||
ORTE_ERROR_LOG(ret);
|
||||
exit_status = ret;
|
||||
goto cleanup;
|
||||
}
|
||||
|
||||
if (ORTE_SUCCESS != (ret = opal_dss.pack(&loc_buffer, &(app_info->metadata_filename), 1, OPAL_STRING )) ) {
|
||||
if (ORTE_SUCCESS != (ret = opal_dss.pack(loc_buffer, &(app_info->metadata_filename), 1, OPAL_STRING))) {
|
||||
ORTE_ERROR_LOG(ret);
|
||||
exit_status = ret;
|
||||
goto cleanup;
|
||||
}
|
||||
|
||||
if (ORTE_SUCCESS != (ret = orte_rml.send_buffer_nb(peer, &loc_buffer,
|
||||
ORTE_RML_TAG_SSTORE_INTERNAL,
|
||||
if (ORTE_SUCCESS != (ret = orte_rml.send_buffer_nb(peer, loc_buffer, ORTE_RML_TAG_SSTORE_INTERNAL,
|
||||
orte_rml_send_callback, NULL))) {
|
||||
ORTE_ERROR_LOG(ret);
|
||||
exit_status = ret;
|
||||
goto cleanup;
|
||||
}
|
||||
|
||||
/* loc_buffer should not be released here; the callback releases it */
|
||||
loc_buffer = NULL;
|
||||
|
||||
cleanup:
|
||||
OBJ_DESTRUCT(&loc_buffer);
|
||||
if (NULL != loc_buffer) {
|
||||
OBJ_RELEASE(loc_buffer);
|
||||
loc_buffer = NULL;
|
||||
}
|
||||
|
||||
return exit_status;
|
||||
}
|
||||
@ -1626,7 +1635,7 @@ static int wait_all_compressed(orte_sstore_stage_local_snapshot_info_t *handle_i
|
||||
static int pull_handle_info(orte_sstore_stage_local_snapshot_info_t *handle_info )
|
||||
{
|
||||
int ret, exit_status = ORTE_SUCCESS;
|
||||
opal_buffer_t buffer;
|
||||
opal_buffer_t *buffer = NULL;
|
||||
orte_sstore_stage_cmd_flag_t command;
|
||||
|
||||
/*
|
||||
@ -1640,25 +1649,25 @@ static int pull_handle_info(orte_sstore_stage_local_snapshot_info_t *handle_info
|
||||
return ORTE_SUCCESS;
|
||||
}
|
||||
|
||||
OBJ_CONSTRUCT(&buffer, opal_buffer_t);
|
||||
buffer = OBJ_NEW(opal_buffer_t);
|
||||
|
||||
/*
|
||||
* Ask the daemon to send us the info that we need
|
||||
*/
|
||||
command = ORTE_SSTORE_STAGE_PULL;
|
||||
if (ORTE_SUCCESS != (ret = opal_dss.pack(&buffer, &command, 1, ORTE_SSTORE_STAGE_CMD )) ) {
|
||||
if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &command, 1, ORTE_SSTORE_STAGE_CMD))) {
|
||||
ORTE_ERROR_LOG(ret);
|
||||
exit_status = ret;
|
||||
goto cleanup;
|
||||
}
|
||||
|
||||
if (ORTE_SUCCESS != (ret = opal_dss.pack(&buffer, &(handle_info->id), 1, ORTE_SSTORE_HANDLE )) ) {
|
||||
if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &(handle_info->id), 1, ORTE_SSTORE_HANDLE))) {
|
||||
ORTE_ERROR_LOG(ret);
|
||||
exit_status = ret;
|
||||
goto cleanup;
|
||||
}
|
||||
|
||||
if (ORTE_SUCCESS != (ret = orte_rml.send_buffer_nb(ORTE_PROC_MY_HNP, &buffer,
|
||||
if (ORTE_SUCCESS != (ret = orte_rml.send_buffer_nb(ORTE_PROC_MY_HNP, buffer,
|
||||
ORTE_RML_TAG_SSTORE_INTERNAL,
|
||||
orte_rml_send_callback, NULL))) {
|
||||
ORTE_ERROR_LOG(ret);
|
||||
@ -1666,8 +1675,14 @@ static int pull_handle_info(orte_sstore_stage_local_snapshot_info_t *handle_info
|
||||
goto cleanup;
|
||||
}
|
||||
|
||||
/* buffer should not be released here; the callback releases it */
|
||||
buffer = NULL;
|
||||
|
||||
cleanup:
|
||||
OBJ_DESTRUCT(&buffer);
|
||||
if (NULL != buffer) {
|
||||
OBJ_RELEASE(buffer);
|
||||
buffer = NULL;
|
||||
}
|
||||
|
||||
return exit_status;
|
||||
}
|
||||
@ -1675,29 +1690,29 @@ static int pull_handle_info(orte_sstore_stage_local_snapshot_info_t *handle_info
|
||||
static int push_handle_info(orte_sstore_stage_local_snapshot_info_t *handle_info )
|
||||
{
|
||||
int ret, exit_status = ORTE_SUCCESS;
|
||||
opal_buffer_t buffer;
|
||||
opal_buffer_t *buffer = NULL;
|
||||
orte_sstore_stage_cmd_flag_t command;
|
||||
orte_sstore_stage_local_app_snapshot_info_t *app_info = NULL;
|
||||
opal_list_item_t *item = NULL;
|
||||
size_t list_size;
|
||||
|
||||
OBJ_CONSTRUCT(&buffer, opal_buffer_t);
|
||||
buffer = OBJ_NEW(opal_buffer_t);
|
||||
|
||||
command = ORTE_SSTORE_STAGE_PUSH;
|
||||
if (ORTE_SUCCESS != (ret = opal_dss.pack(&buffer, &command, 1, ORTE_SSTORE_STAGE_CMD )) ) {
|
||||
if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &command, 1, ORTE_SSTORE_STAGE_CMD))) {
|
||||
ORTE_ERROR_LOG(ret);
|
||||
exit_status = ret;
|
||||
goto cleanup;
|
||||
}
|
||||
|
||||
if (ORTE_SUCCESS != (ret = opal_dss.pack(&buffer, &(handle_info->id), 1, ORTE_SSTORE_HANDLE )) ) {
|
||||
if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &(handle_info->id), 1, ORTE_SSTORE_HANDLE))) {
|
||||
ORTE_ERROR_LOG(ret);
|
||||
exit_status = ret;
|
||||
goto cleanup;
|
||||
}
|
||||
|
||||
list_size = opal_list_get_size(handle_info->app_info_handle);
|
||||
if (ORTE_SUCCESS != (ret = opal_dss.pack(&buffer, &list_size, 1, OPAL_SIZE )) ) {
|
||||
if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &list_size, 1, OPAL_SIZE))) {
|
||||
ORTE_ERROR_LOG(ret);
|
||||
exit_status = ret;
|
||||
goto cleanup;
|
||||
@ -1711,32 +1726,32 @@ static int push_handle_info(orte_sstore_stage_local_snapshot_info_t *handle_info
|
||||
item = opal_list_get_next(item) ) {
|
||||
app_info = (orte_sstore_stage_local_app_snapshot_info_t*)item;
|
||||
|
||||
if (ORTE_SUCCESS != (ret = opal_dss.pack(&buffer, &(app_info->name), 1, ORTE_NAME )) ) {
|
||||
if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &(app_info->name), 1, ORTE_NAME))) {
|
||||
ORTE_ERROR_LOG(ret);
|
||||
exit_status = ret;
|
||||
goto cleanup;
|
||||
}
|
||||
|
||||
if (ORTE_SUCCESS != (ret = opal_dss.pack(&buffer, &(app_info->ckpt_skipped), 1, OPAL_BOOL )) ) {
|
||||
if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &(app_info->ckpt_skipped), 1, OPAL_BOOL))) {
|
||||
ORTE_ERROR_LOG(ret);
|
||||
exit_status = ret;
|
||||
goto cleanup;
|
||||
}
|
||||
|
||||
if( !app_info->ckpt_skipped ) {
|
||||
if (ORTE_SUCCESS != (ret = opal_dss.pack(&buffer, &(app_info->crs_comp), 1, OPAL_STRING )) ) {
|
||||
if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &(app_info->crs_comp), 1, OPAL_STRING))) {
|
||||
ORTE_ERROR_LOG(ret);
|
||||
exit_status = ret;
|
||||
goto cleanup;
|
||||
}
|
||||
|
||||
if( orte_sstore_stage_enabled_compression ) {
|
||||
if (ORTE_SUCCESS != (ret = opal_dss.pack(&buffer, &(handle_info->compress_comp), 1, OPAL_STRING))) {
|
||||
if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &(handle_info->compress_comp), 1, OPAL_STRING))) {
|
||||
ORTE_ERROR_LOG(ret);
|
||||
exit_status = ret;
|
||||
goto cleanup;
|
||||
}
|
||||
if (ORTE_SUCCESS != (ret = opal_dss.pack(&buffer, &(handle_info->compress_postfix), 1, OPAL_STRING))) {
|
||||
if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &(handle_info->compress_postfix), 1, OPAL_STRING))) {
|
||||
ORTE_ERROR_LOG(ret);
|
||||
exit_status = ret;
|
||||
goto cleanup;
|
||||
@ -1745,16 +1760,21 @@ static int push_handle_info(orte_sstore_stage_local_snapshot_info_t *handle_info
|
||||
}
|
||||
}
|
||||
|
||||
if (ORTE_SUCCESS != (ret = orte_rml.send_buffer_nb(ORTE_PROC_MY_HNP, &buffer,
|
||||
ORTE_RML_TAG_SSTORE_INTERNAL,
|
||||
if (ORTE_SUCCESS != (ret = orte_rml.send_buffer_nb(ORTE_PROC_MY_HNP, buffer, ORTE_RML_TAG_SSTORE_INTERNAL,
|
||||
orte_rml_send_callback, NULL))) {
|
||||
ORTE_ERROR_LOG(ret);
|
||||
exit_status = ret;
|
||||
goto cleanup;
|
||||
}
|
||||
|
||||
/* buffer should not be released here; the callback releases it */
|
||||
buffer = NULL;
|
||||
|
||||
cleanup:
|
||||
OBJ_DESTRUCT(&buffer);
|
||||
if (NULL != buffer) {
|
||||
OBJ_RELEASE(buffer);
|
||||
buffer = NULL;
|
||||
}
|
||||
|
||||
return exit_status;
|
||||
}
|
||||
|
Загрузка…
Ссылка в новой задаче
Block a user