1
1

SNAPC: use dynamic buffers for rml.send and rml.recv

The snapc component was still using static buffers
for send_buffer_nb(). This patch changes opal_buffer_t buffer;
to opal_buffer_t *buffer;

This commit was SVN r30484.
Этот коммит содержится в:
Adrian Reber 2014-01-29 19:58:33 +00:00
родитель 7bf4c425ff
Коммит 2900f24b67
3 изменённых файлов: 115 добавлений и 113 удалений

Просмотреть файл

@ -115,7 +115,7 @@ int app_coord_init()
opal_cr_notify_callback_fn_t prev_notify_func;
orte_snapc_full_cmd_flag_t command = ORTE_SNAPC_FULL_REQUEST_OP_CMD;
orte_snapc_base_request_op_event_t op_event = ORTE_SNAPC_OP_INIT;
opal_buffer_t *buffer;
opal_buffer_t *buffer = NULL;
orte_grpcomm_collective_t *coll;
OPAL_OUTPUT_VERBOSE((20, mca_snapc_full_component.super.output_handle,
@ -221,7 +221,7 @@ int app_coord_finalize()
int ret, exit_status = ORTE_SUCCESS;
orte_snapc_full_cmd_flag_t command = ORTE_SNAPC_FULL_REQUEST_OP_CMD;
orte_snapc_base_request_op_event_t op_event = ORTE_SNAPC_OP_FIN;
opal_buffer_t *buffer;
opal_buffer_t *buffer = NULL;
orte_std_cntr_t count;
orte_grpcomm_collective_t *coll;
@ -298,17 +298,18 @@ int app_coord_finalize()
goto cleanup;
}
#endif /* ENABLE_FT_FIXED */
orte_rml.recv_buffer_nb(ORTE_PROC_MY_HNP, ORTE_RML_TAG_SNAPC_FULL, 0, snapc_full_app_callback_recv, NULL);
orte_rml.recv_buffer_nb(ORTE_PROC_MY_HNP, ORTE_RML_TAG_SNAPC_FULL, 0, snapc_full_app_callback_recv, buffer);
/* wait for completion */
count = 1;
if (ORTE_SUCCESS != (ret = opal_dss.unpack(&buffer, &command, &count, ORTE_SNAPC_FULL_CMD))) {
if (ORTE_SUCCESS != (ret = opal_dss.unpack(buffer, &command, &count, ORTE_SNAPC_FULL_CMD))) {
ORTE_ERROR_LOG(ret);
exit_status = ret;
goto cleanup;
}
count = 1;
if (ORTE_SUCCESS != (ret = opal_dss.unpack(&buffer, &op_event, &count, OPAL_INT))) {
if (ORTE_SUCCESS != (ret = opal_dss.unpack(buffer, &op_event, &count, OPAL_INT))) {
ORTE_ERROR_LOG(ret);
exit_status = ret;
goto cleanup;
@ -332,6 +333,11 @@ int app_coord_finalize()
cleanup:
/* cleanup */
if (NULL != buffer) {
OBJ_RELEASE(buffer);
buffer = NULL;
}
OBJ_RELEASE(coll);
/*
@ -347,7 +353,7 @@ int app_coord_finalize()
app_comm_pipe_w = NULL;
}
return ORTE_SUCCESS;
return exit_status;
}
/******************
@ -821,24 +827,24 @@ static int app_notify_resp_stage_3(int cr_state, bool skip_fin_msg)
static int snapc_full_app_finished_msg(int cr_state) {
int ret, exit_status = ORTE_SUCCESS;
opal_buffer_t buffer;
opal_buffer_t *buffer = NULL;
orte_snapc_cmd_flag_t command = ORTE_SNAPC_LOCAL_FINISH_CMD;
OBJ_CONSTRUCT(&buffer, opal_buffer_t);
buffer = OBJ_NEW(opal_buffer_t);
if (ORTE_SUCCESS != (ret = opal_dss.pack(&buffer, &command, 1, ORTE_SNAPC_CMD )) ) {
if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &command, 1, ORTE_SNAPC_CMD))) {
ORTE_ERROR_LOG(ret);
exit_status = ret;
goto cleanup;
}
if (ORTE_SUCCESS != (ret = opal_dss.pack(&buffer, &cr_state, 1, OPAL_INT))) {
if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &cr_state, 1, OPAL_INT))) {
ORTE_ERROR_LOG(ret);
exit_status = ret;
goto cleanup;
}
if (ORTE_SUCCESS != (ret = orte_rml.send_buffer_nb(ORTE_PROC_MY_DAEMON, &buffer,
if (ORTE_SUCCESS != (ret = orte_rml.send_buffer_nb(ORTE_PROC_MY_DAEMON, buffer,
ORTE_RML_TAG_SNAPC,
orte_rml_send_callback, 0))) {
ORTE_ERROR_LOG(ret);
@ -846,8 +852,9 @@ static int snapc_full_app_finished_msg(int cr_state) {
goto cleanup;
}
return ORTE_SUCCESS;
cleanup:
OBJ_DESTRUCT(&buffer);
OBJ_RELEASE(buffer);
return exit_status;
}
@ -1241,39 +1248,39 @@ int app_coord_ft_event(int state) {
static int snapc_full_app_ft_event_update_process_info(orte_process_name_t proc, pid_t proc_pid)
{
int ret, exit_status = ORTE_SUCCESS;
opal_buffer_t buffer;
opal_buffer_t *buffer = NULL;
orte_snapc_cmd_flag_t command = ORTE_SNAPC_LOCAL_UPDATE_CMD;
OBJ_CONSTRUCT(&buffer, opal_buffer_t);
buffer = OBJ_NEW(opal_buffer_t);
if (ORTE_SUCCESS != (ret = opal_dss.pack(&buffer, &command, 1, ORTE_SNAPC_CMD )) ) {
if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &command, 1, ORTE_SNAPC_CMD))) {
ORTE_ERROR_LOG(ret);
exit_status = ret;
goto cleanup;
}
/* JJH CLEANUP: Do we really need this, it is equal to sender */
if (ORTE_SUCCESS != (ret = opal_dss.pack(&buffer, &proc, 1, ORTE_NAME))) {
if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &proc, 1, ORTE_NAME))) {
ORTE_ERROR_LOG(ret);
exit_status = ret;
goto cleanup;
}
if (ORTE_SUCCESS != (ret = opal_dss.pack(&buffer, &proc_pid, 1, OPAL_PID))) {
if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &proc_pid, 1, OPAL_PID))) {
ORTE_ERROR_LOG(ret);
exit_status = ret;
goto cleanup;
}
#if OPAL_ENABLE_CRDEBUG == 1
if (ORTE_SUCCESS != (ret = opal_dss.pack(&buffer, &MPIR_debug_with_checkpoint, 1, OPAL_BOOL))) {
if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &MPIR_debug_with_checkpoint, 1, OPAL_BOOL))) {
ORTE_ERROR_LOG(ret);
exit_status = ret;
goto cleanup;
}
#endif
if (ORTE_SUCCESS != (ret = orte_rml.send_buffer_nb(ORTE_PROC_MY_DAEMON, &buffer,
if (ORTE_SUCCESS != (ret = orte_rml.send_buffer_nb(ORTE_PROC_MY_DAEMON, buffer,
ORTE_RML_TAG_SNAPC,
orte_rml_send_callback, 0))) {
ORTE_ERROR_LOG(ret);
@ -1281,8 +1288,9 @@ static int snapc_full_app_ft_event_update_process_info(orte_process_name_t proc,
goto cleanup;
}
return ORTE_SUCCESS;
cleanup:
OBJ_DESTRUCT(&buffer);
OBJ_RELEASE(buffer);
return exit_status;
}
@ -1291,7 +1299,7 @@ int app_coord_request_op(orte_snapc_base_request_op_t *datum)
{
int ret, exit_status = ORTE_SUCCESS;
orte_snapc_full_cmd_flag_t command = ORTE_SNAPC_FULL_REQUEST_OP_CMD;
opal_buffer_t buffer;
opal_buffer_t *buffer = NULL;
orte_std_cntr_t count;
int op_event, op_state;
char *seq_str = NULL, *tmp_str = NULL;
@ -1376,39 +1384,34 @@ int app_coord_request_op(orte_snapc_base_request_op_t *datum)
/*
* Send request to HNP
*/
OBJ_CONSTRUCT(&buffer, opal_buffer_t);
buffer = OBJ_NEW(opal_buffer_t);
if (ORTE_SUCCESS != (ret = opal_dss.pack(&buffer, &command, 1, ORTE_SNAPC_FULL_CMD))) {
if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &command, 1, ORTE_SNAPC_FULL_CMD))) {
ORTE_ERROR_LOG(ret);
exit_status = ret;
OBJ_DESTRUCT(&buffer);
goto cleanup;
}
if (ORTE_SUCCESS != (ret = opal_dss.pack(&buffer, &(ORTE_PROC_MY_NAME->jobid), 1, ORTE_JOBID))) {
if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &(ORTE_PROC_MY_NAME->jobid), 1, ORTE_JOBID))) {
ORTE_ERROR_LOG(ret);
exit_status = ret;
OBJ_DESTRUCT(&buffer);
goto cleanup;
}
if (ORTE_SUCCESS != (ret = opal_dss.pack(&buffer, &(datum->event), 1, OPAL_INT))) {
if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &(datum->event), 1, OPAL_INT))) {
ORTE_ERROR_LOG(ret);
exit_status = ret;
OBJ_DESTRUCT(&buffer);
goto cleanup;
}
if( ORTE_SNAPC_OP_RESTART == datum->event) {
if (ORTE_SUCCESS != (ret = opal_dss.pack(&buffer, &(datum->seq_num), 1, OPAL_INT))) {
if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &(datum->seq_num), 1, OPAL_INT))) {
ORTE_ERROR_LOG(ret);
exit_status = ret;
OBJ_DESTRUCT(&buffer);
goto cleanup;
}
if (ORTE_SUCCESS != (ret = opal_dss.pack(&buffer, &(datum->global_handle), 1, OPAL_STRING))) {
if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &(datum->global_handle), 1, OPAL_STRING))) {
ORTE_ERROR_LOG(ret);
exit_status = ret;
OBJ_DESTRUCT(&buffer);
goto cleanup;
}
}
@ -1438,10 +1441,9 @@ int app_coord_request_op(orte_snapc_base_request_op_t *datum)
/*
* Send information
*/
if (ORTE_SUCCESS != (ret = opal_dss.pack(&buffer, &(datum->mig_num), 1, OPAL_INT))) {
if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &(datum->mig_num), 1, OPAL_INT))) {
ORTE_ERROR_LOG(ret);
exit_status = ret;
OBJ_DESTRUCT(&buffer);
goto cleanup;
}
@ -1455,17 +1457,15 @@ int app_coord_request_op(orte_snapc_base_request_op_t *datum)
(OPAL_INT_TO_BOOL((datum->mig_off_node)[i]) ? 'T' : 'F')
));
if (ORTE_SUCCESS != (ret = opal_dss.pack(&buffer, &((datum->mig_vpids)[i]), 1, OPAL_INT))) {
if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &((datum->mig_vpids)[i]), 1, OPAL_INT))) {
ORTE_ERROR_LOG(ret);
exit_status = ret;
OBJ_DESTRUCT(&buffer);
goto cleanup;
}
tmp_str = strdup((datum->mig_host_pref)[i]);
if (ORTE_SUCCESS != (ret = opal_dss.pack(&buffer, &tmp_str, 1, OPAL_STRING))) {
if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &tmp_str, 1, OPAL_STRING))) {
ORTE_ERROR_LOG(ret);
exit_status = ret;
OBJ_DESTRUCT(&buffer);
goto cleanup;
}
if( NULL != tmp_str ) {
@ -1473,31 +1473,27 @@ int app_coord_request_op(orte_snapc_base_request_op_t *datum)
tmp_str = NULL;
}
if (ORTE_SUCCESS != (ret = opal_dss.pack(&buffer, &((datum->mig_vpid_pref)[i]), 1, OPAL_INT))) {
if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &((datum->mig_vpid_pref)[i]), 1, OPAL_INT))) {
ORTE_ERROR_LOG(ret);
exit_status = ret;
OBJ_DESTRUCT(&buffer);
goto cleanup;
}
if (ORTE_SUCCESS != (ret = opal_dss.pack(&buffer, &((datum->mig_off_node)[i]), 1, OPAL_INT))) {
if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &((datum->mig_off_node)[i]), 1, OPAL_INT))) {
ORTE_ERROR_LOG(ret);
exit_status = ret;
OBJ_DESTRUCT(&buffer);
goto cleanup;
}
}
}
if (ORTE_SUCCESS != (ret = orte_rml.send_buffer_nb(ORTE_PROC_MY_HNP, &buffer,
ORTE_RML_TAG_SNAPC_FULL,
if (ORTE_SUCCESS != (ret = orte_rml.send_buffer_nb(ORTE_PROC_MY_HNP, buffer, ORTE_RML_TAG_SNAPC_FULL,
orte_rml_send_callback, 0))) {
ORTE_ERROR_LOG(ret);
exit_status = ret;
OBJ_DESTRUCT(&buffer);
goto cleanup;
}
OBJ_DESTRUCT(&buffer);
/* buffer should not be released here; the callback releases it */
buffer = NULL;
}
/*
@ -1539,7 +1535,7 @@ int app_coord_request_op(orte_snapc_base_request_op_t *datum)
goto cleanup;
}
OBJ_CONSTRUCT(&buffer, opal_buffer_t);
buffer = OBJ_NEW(opal_buffer_t);
/*
* Wait for a response regarding completion
@ -1553,31 +1549,30 @@ int app_coord_request_op(orte_snapc_base_request_op_t *datum)
goto cleanup;
}
#endif /* ENABLE_FT_FIXED */
orte_rml.recv_buffer_nb(ORTE_PROC_MY_HNP, ORTE_RML_TAG_SNAPC_FULL, 0, snapc_full_app_callback_recv, NULL);
orte_rml.recv_buffer_nb(ORTE_PROC_MY_HNP, ORTE_RML_TAG_SNAPC_FULL, 0, snapc_full_app_callback_recv, buffer);
/* wait for completion */
count = 1;
if (ORTE_SUCCESS != (ret = opal_dss.unpack(&buffer, &command, &count, ORTE_SNAPC_FULL_CMD))) {
if (ORTE_SUCCESS != (ret = opal_dss.unpack(buffer, &command, &count, ORTE_SNAPC_FULL_CMD))) {
ORTE_ERROR_LOG(ret);
exit_status = ret;
goto cleanup;
}
count = 1;
if (ORTE_SUCCESS != (ret = opal_dss.unpack(&buffer, &op_event, &count, OPAL_INT))) {
if (ORTE_SUCCESS != (ret = opal_dss.unpack(buffer, &op_event, &count, OPAL_INT))) {
ORTE_ERROR_LOG(ret);
exit_status = ret;
goto cleanup;
}
count = 1;
if (ORTE_SUCCESS != (ret = opal_dss.unpack(&buffer, &op_state, &count, OPAL_INT))) {
if (ORTE_SUCCESS != (ret = opal_dss.unpack(buffer, &op_state, &count, OPAL_INT))) {
ORTE_ERROR_LOG(ret);
exit_status = ret;
goto cleanup;
}
OBJ_DESTRUCT(&buffer);
orte_sstore.get_attr(last_ss_handle,
SSTORE_METADATA_GLOBAL_SNAP_SEQ,
&seq_str);
@ -1613,7 +1608,7 @@ int app_coord_request_op(orte_snapc_base_request_op_t *datum)
"App) Request_op: Leader waiting for Migrate release (%3d)...",
datum->event));
OBJ_CONSTRUCT(&buffer, opal_buffer_t);
buffer = OBJ_NEW(opal_buffer_t);
/*
* Wait for a response regarding completion
@ -1627,31 +1622,30 @@ int app_coord_request_op(orte_snapc_base_request_op_t *datum)
goto cleanup;
}
#endif /* ENABLE_FT_FIXED */
orte_rml.recv_buffer_nb(ORTE_PROC_MY_HNP, ORTE_RML_TAG_SNAPC_FULL, 0, snapc_full_app_callback_recv, NULL);
orte_rml.recv_buffer_nb(ORTE_PROC_MY_HNP, ORTE_RML_TAG_SNAPC_FULL, 0, snapc_full_app_callback_recv, buffer);
/* wait for completion */
count = 1;
if (ORTE_SUCCESS != (ret = opal_dss.unpack(&buffer, &command, &count, ORTE_SNAPC_FULL_CMD))) {
if (ORTE_SUCCESS != (ret = opal_dss.unpack(buffer, &command, &count, ORTE_SNAPC_FULL_CMD))) {
ORTE_ERROR_LOG(ret);
exit_status = ret;
goto cleanup;
}
count = 1;
if (ORTE_SUCCESS != (ret = opal_dss.unpack(&buffer, &op_event, &count, OPAL_INT))) {
if (ORTE_SUCCESS != (ret = opal_dss.unpack(buffer, &op_event, &count, OPAL_INT))) {
ORTE_ERROR_LOG(ret);
exit_status = ret;
goto cleanup;
}
count = 1;
if (ORTE_SUCCESS != (ret = opal_dss.unpack(&buffer, &op_state, &count, OPAL_INT))) {
if (ORTE_SUCCESS != (ret = opal_dss.unpack(buffer, &op_state, &count, OPAL_INT))) {
ORTE_ERROR_LOG(ret);
exit_status = ret;
goto cleanup;
}
OBJ_DESTRUCT(&buffer);
OPAL_OUTPUT_VERBOSE((5, mca_snapc_full_component.super.output_handle,
"App) Request_op: Leader continuing from Migration (%3d)...",
datum->event));
@ -1685,6 +1679,11 @@ int app_coord_request_op(orte_snapc_base_request_op_t *datum)
cleanup:
if (NULL != buffer) {
OBJ_RELEASE(buffer);
buffer = NULL;
}
if( NULL != seq_str ) {
free(seq_str);
seq_str = NULL;

Просмотреть файл

@ -1170,7 +1170,7 @@ static void snapc_full_process_request_op_cmd(orte_process_name_t* sender,
orte_jobid_t jobid;
int op_event, op_state;
opal_crs_base_ckpt_options_t *options = NULL;
opal_buffer_t buffer;
opal_buffer_t *buffer = NULL;
orte_snapc_full_cmd_flag_t command = ORTE_SNAPC_FULL_REQUEST_OP_CMD;
int seq_num = -1, i;
char * global_handle = NULL, *tmp_str = NULL;
@ -1231,26 +1231,25 @@ static void snapc_full_process_request_op_cmd(orte_process_name_t* sender,
OPAL_OUTPUT_VERBOSE((3, mca_snapc_full_component.super.output_handle,
"Global) process_request_op(): Send Finalize ACK to the job"));
OBJ_CONSTRUCT(&buffer, opal_buffer_t);
if (ORTE_SUCCESS != (ret = opal_dss.pack(&buffer, &command, 1, ORTE_SNAPC_FULL_CMD))) {
buffer = OBJ_NEW(opal_buffer_t);
if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &command, 1, ORTE_SNAPC_FULL_CMD))) {
ORTE_ERROR_LOG(ret);
goto cleanup;
}
op_event = ORTE_SNAPC_OP_FIN_ACK;
if (ORTE_SUCCESS != (ret = opal_dss.pack(&buffer, &op_event, 1, OPAL_INT))) {
if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &op_event, 1, OPAL_INT))) {
ORTE_ERROR_LOG(ret);
goto cleanup;
}
if (ORTE_SUCCESS != (ret = orte_rml.send_buffer_nb(sender, &buffer,
ORTE_RML_TAG_SNAPC_FULL,
if (ORTE_SUCCESS != (ret = orte_rml.send_buffer_nb(sender, buffer, ORTE_RML_TAG_SNAPC_FULL,
orte_rml_send_callback, NULL))) {
ORTE_ERROR_LOG(ret);
/* FIXME: buffer not cleaned up */
goto cleanup;
}
OBJ_DESTRUCT(&buffer);
/* buffer should not be released here; the callback releases it */
buffer = NULL;
}
/************************************
* Start a checkpoint operation
@ -1283,30 +1282,29 @@ static void snapc_full_process_request_op_cmd(orte_process_name_t* sender,
/*
* Tell the sender that the operation is finished
*/
OBJ_CONSTRUCT(&buffer, opal_buffer_t);
if (ORTE_SUCCESS != (ret = opal_dss.pack(&buffer, &command, 1, ORTE_SNAPC_FULL_CMD))) {
buffer = OBJ_NEW(opal_buffer_t);
if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &command, 1, ORTE_SNAPC_FULL_CMD))) {
ORTE_ERROR_LOG(ret);
goto cleanup;
}
if (ORTE_SUCCESS != (ret = opal_dss.pack(&buffer, &op_event, 1, OPAL_INT))) {
if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &op_event, 1, OPAL_INT))) {
ORTE_ERROR_LOG(ret);
goto cleanup;
}
if (ORTE_SUCCESS != (ret = opal_dss.pack(&buffer, &op_state, 1, OPAL_INT))) {
if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &op_state, 1, OPAL_INT))) {
ORTE_ERROR_LOG(ret);
goto cleanup;
}
if (ORTE_SUCCESS != (ret = orte_rml.send_buffer_nb(sender, &buffer,
ORTE_RML_TAG_SNAPC_FULL,
if (ORTE_SUCCESS != (ret = orte_rml.send_buffer_nb(sender, buffer, ORTE_RML_TAG_SNAPC_FULL,
orte_rml_send_callback, NULL))) {
ORTE_ERROR_LOG(ret);
/* FIXME: buffer not cleaned up */
goto cleanup;
}
OBJ_DESTRUCT(&buffer);
/* buffer should not be released here; the callback releases it */
buffer = NULL;
}
/************************************
* Start the Restart operation
@ -1426,31 +1424,28 @@ static void snapc_full_process_request_op_cmd(orte_process_name_t* sender,
OPAL_OUTPUT_VERBOSE((20, mca_snapc_full_component.super.output_handle,
"Global) ------ Finished Migration. Release processes (%15s )-----",
ORTE_NAME_PRINT(sender) ));
OBJ_CONSTRUCT(&buffer, opal_buffer_t);
if (ORTE_SUCCESS != (ret = opal_dss.pack(&buffer, &command, 1, ORTE_SNAPC_FULL_CMD))) {
buffer = OBJ_NEW(opal_buffer_t);
if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &command, 1, ORTE_SNAPC_FULL_CMD))) {
ORTE_ERROR_LOG(ret);
goto cleanup;
}
if (ORTE_SUCCESS != (ret = opal_dss.pack(&buffer, &op_event, 1, OPAL_INT))) {
if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &op_event, 1, OPAL_INT))) {
ORTE_ERROR_LOG(ret);
goto cleanup;
}
op_state = 0;
if (ORTE_SUCCESS != (ret = opal_dss.pack(&buffer, &op_state, 1, OPAL_INT))) {
if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &op_state, 1, OPAL_INT))) {
ORTE_ERROR_LOG(ret);
goto cleanup;
}
if (ORTE_SUCCESS != (ret = orte_rml.send_buffer_nb(sender, &buffer,
ORTE_RML_TAG_SNAPC_FULL,
if (ORTE_SUCCESS != (ret = orte_rml.send_buffer_nb(sender, buffer, ORTE_RML_TAG_SNAPC_FULL,
orte_rml_send_callback, NULL))) {
ORTE_ERROR_LOG(ret);
/* FIXME: buffer not cleaned up */
goto cleanup;
}
OBJ_DESTRUCT(&buffer);
OPAL_OUTPUT_VERBOSE((20, mca_snapc_full_component.super.output_handle,
"Global) ------ Finished Migration. Released processes (%15s )-----",
@ -1504,8 +1499,12 @@ static void snapc_full_process_request_op_cmd(orte_process_name_t* sender,
op_event));
}
cleanup:
if (NULL != buffer) {
OBJ_RELEASE(buffer);
buffer = NULL;
}
cleanup:
if( NULL != options ) {
OBJ_RELEASE(options);
options = NULL;
@ -2105,7 +2104,7 @@ static int orte_snapc_full_global_set_job_ckpt_info( orte_jobid_t jobid,
{
int ret, exit_status = ORTE_SUCCESS;
orte_snapc_full_cmd_flag_t command;
opal_buffer_t buffer;
opal_buffer_t *buffer = NULL;
char * state_str = NULL;
orte_proc_t *proc = NULL;
opal_list_item_t *item = NULL;
@ -2114,7 +2113,7 @@ static int orte_snapc_full_global_set_job_ckpt_info( orte_jobid_t jobid,
/*
* Update all Local Coordinators (broadcast operation)
*/
OBJ_CONSTRUCT(&buffer, opal_buffer_t);
buffer = OBJ_NEW(opal_buffer_t);
if( quick ) {
command = ORTE_SNAPC_FULL_UPDATE_JOB_STATE_QUICK_CMD;
@ -2122,19 +2121,19 @@ static int orte_snapc_full_global_set_job_ckpt_info( orte_jobid_t jobid,
command = ORTE_SNAPC_FULL_UPDATE_JOB_STATE_CMD;
}
if (ORTE_SUCCESS != (ret = opal_dss.pack(&buffer, &command, 1, ORTE_SNAPC_FULL_CMD))) {
if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &command, 1, ORTE_SNAPC_FULL_CMD))) {
ORTE_ERROR_LOG(ret);
exit_status = ret;
goto cleanup;
}
if (ORTE_SUCCESS != (ret = opal_dss.pack(&buffer, &jobid, 1, ORTE_JOBID))) {
if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &jobid, 1, ORTE_JOBID))) {
ORTE_ERROR_LOG(ret);
exit_status = ret;
goto cleanup;
}
if (ORTE_SUCCESS != (ret = opal_dss.pack(&buffer, &ckpt_state, 1, OPAL_INT))) {
if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &ckpt_state, 1, OPAL_INT))) {
ORTE_ERROR_LOG(ret);
exit_status = ret;
goto cleanup;
@ -2144,19 +2143,19 @@ static int orte_snapc_full_global_set_job_ckpt_info( orte_jobid_t jobid,
goto process_msg;
}
if (ORTE_SUCCESS != (ret = orte_sstore.pack_handle(NULL, &buffer, handle))) {
if (ORTE_SUCCESS != (ret = orte_sstore.pack_handle(NULL, buffer, handle))) {
ORTE_ERROR_LOG(ret);
exit_status = ret;
goto cleanup;
}
if( ORTE_SUCCESS != (ret = orte_snapc_base_pack_options(&buffer, options)) ) {
if(ORTE_SUCCESS != (ret = orte_snapc_base_pack_options(buffer, options))) {
ORTE_ERROR_LOG(ret);
exit_status = ret;
goto cleanup;
}
if (ORTE_SUCCESS != (ret = opal_dss.pack(&buffer, &(currently_migrating), 1, OPAL_BOOL))) {
if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &(currently_migrating), 1, OPAL_BOOL))) {
ORTE_ERROR_LOG(ret);
exit_status = ret;
goto cleanup;
@ -2165,7 +2164,7 @@ static int orte_snapc_full_global_set_job_ckpt_info( orte_jobid_t jobid,
if( currently_migrating ) {
num_procs = opal_list_get_size(migrating_procs);
if (ORTE_SUCCESS != (ret = opal_dss.pack(&buffer, &num_procs, 1, OPAL_SIZE))) {
if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &num_procs, 1, OPAL_SIZE))) {
ORTE_ERROR_LOG(ret);
exit_status = ret;
goto cleanup;
@ -2175,7 +2174,7 @@ static int orte_snapc_full_global_set_job_ckpt_info( orte_jobid_t jobid,
item != opal_list_get_end(migrating_procs);
item = opal_list_get_next(item)) {
proc = (orte_proc_t*)item;
if (ORTE_SUCCESS != (ret = opal_dss.pack(&buffer, &(proc->name), 1, ORTE_NAME))) {
if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &(proc->name), 1, ORTE_NAME))) {
ORTE_ERROR_LOG(ret);
exit_status = ret;
goto cleanup;
@ -2191,7 +2190,7 @@ static int orte_snapc_full_global_set_job_ckpt_info( orte_jobid_t jobid,
free(state_str);
state_str = NULL;
if( ORTE_SUCCESS != (ret = orte_grpcomm.xcast(ORTE_PROC_MY_NAME->jobid, &buffer, ORTE_RML_TAG_SNAPC_FULL)) ) {
if( ORTE_SUCCESS != (ret = orte_grpcomm.xcast(ORTE_PROC_MY_NAME->jobid, buffer, ORTE_RML_TAG_SNAPC_FULL))) {
ORTE_ERROR_LOG(ret);
exit_status = ret;
goto cleanup;
@ -2207,7 +2206,7 @@ static int orte_snapc_full_global_set_job_ckpt_info( orte_jobid_t jobid,
state_str = NULL;
}
OBJ_DESTRUCT(&buffer);
OBJ_RELEASE(buffer);
return exit_status;
}

Просмотреть файл

@ -626,7 +626,7 @@ static int snapc_full_local_send_restart_proc_info(void)
int ret, exit_status = ORTE_SUCCESS;
orte_snapc_full_app_snapshot_t *vpid_snapshot = NULL;
opal_list_item_t* item = NULL;
opal_buffer_t buffer;
opal_buffer_t *buffer = NULL;
orte_snapc_full_cmd_flag_t command = ORTE_SNAPC_FULL_RESTART_PROC_INFO;
size_t num_vpids = 0;
@ -647,6 +647,7 @@ static int snapc_full_local_send_restart_proc_info(void)
return ORTE_SUCCESS;
}
buffer = OBJ_NEW(opal_buffer_t);
/*
* Local Coordinator: Send Global Coordinator the information
* [ hostname, num_pids, {pids} ]
@ -656,21 +657,19 @@ static int snapc_full_local_send_restart_proc_info(void)
return exit_status;
}
OBJ_CONSTRUCT(&buffer, opal_buffer_t);
if (ORTE_SUCCESS != (ret = opal_dss.pack(&buffer, &command, 1, ORTE_SNAPC_FULL_CMD))) {
if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &command, 1, ORTE_SNAPC_FULL_CMD))) {
ORTE_ERROR_LOG(ret);
exit_status = ret;
goto cleanup;
}
if (ORTE_SUCCESS != (ret = opal_dss.pack(&buffer, &(orte_process_info.nodename), 1, OPAL_STRING))) {
if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &(orte_process_info.nodename), 1, OPAL_STRING))) {
ORTE_ERROR_LOG(ret);
exit_status = ret;
goto cleanup;
}
if (ORTE_SUCCESS != (ret = opal_dss.pack(&buffer, &num_vpids, 1, OPAL_SIZE))) {
if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &num_vpids, 1, OPAL_SIZE))) {
ORTE_ERROR_LOG(ret);
exit_status = ret;
goto cleanup;
@ -681,7 +680,7 @@ static int snapc_full_local_send_restart_proc_info(void)
item = opal_list_get_next(item) ) {
vpid_snapshot = (orte_snapc_full_app_snapshot_t*)item;
if (ORTE_SUCCESS != (ret = opal_dss.pack(&buffer, &(vpid_snapshot->process_pid), 1, OPAL_PID))) {
if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &(vpid_snapshot->process_pid), 1, OPAL_PID))) {
ORTE_ERROR_LOG(ret);
exit_status = ret;
goto cleanup;
@ -689,14 +688,17 @@ static int snapc_full_local_send_restart_proc_info(void)
}
if (0 > (ret = orte_rml.send_buffer(ORTE_PROC_MY_HNP, &buffer, ORTE_RML_TAG_SNAPC_FULL, 0))) {
if (ORTE_SUCCESS != (ret = orte_rml.send_buffer_nb(ORTE_PROC_MY_HNP, buffer, ORTE_RML_TAG_SNAPC_FULL,
orte_rml_send_callback, 0))) {
ORTE_ERROR_LOG(ret);
exit_status = ret;
goto cleanup;
}
return ORTE_SUCCESS;
cleanup:
OBJ_DESTRUCT(&buffer);
OBJ_RELEASE(buffer);
return exit_status;
}
@ -1309,26 +1311,26 @@ static int local_define_pipe_names(orte_snapc_full_app_snapshot_t *vpid_snapshot
static int snapc_full_local_update_coord(int state, bool quick)
{
int ret, exit_status = ORTE_SUCCESS;
opal_buffer_t buffer;
opal_buffer_t *buffer = NULL;
orte_snapc_full_cmd_flag_t command;
/*
* Local Coordinator: Send Global Coordinator state information
*/
OBJ_CONSTRUCT(&buffer, opal_buffer_t);
buffer = OBJ_NEW(opal_buffer_t);
if( quick ) {
command = ORTE_SNAPC_FULL_UPDATE_ORTED_STATE_QUICK_CMD;
} else {
command = ORTE_SNAPC_FULL_UPDATE_ORTED_STATE_CMD;
}
if (ORTE_SUCCESS != (ret = opal_dss.pack(&buffer, &command, 1, ORTE_SNAPC_FULL_CMD )) ) {
if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &command, 1, ORTE_SNAPC_FULL_CMD))) {
ORTE_ERROR_LOG(ret);
exit_status = ret;
goto cleanup;
}
if (ORTE_SUCCESS != (ret = opal_dss.pack(&buffer, &state, 1, OPAL_INT))) {
if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &state, 1, OPAL_INT))) {
ORTE_ERROR_LOG(ret);
exit_status = ret;
goto cleanup;
@ -1345,7 +1347,7 @@ static int snapc_full_local_update_coord(int state, bool quick)
}
send_data:
if (ORTE_SUCCESS != (ret = orte_rml.send_buffer_nb(ORTE_PROC_MY_HNP, &buffer,
if (ORTE_SUCCESS != (ret = orte_rml.send_buffer_nb(ORTE_PROC_MY_HNP, buffer,
ORTE_RML_TAG_SNAPC_FULL,
orte_rml_send_callback, 0))) {
ORTE_ERROR_LOG(ret);
@ -1353,8 +1355,10 @@ static int snapc_full_local_update_coord(int state, bool quick)
goto cleanup;
}
return ORTE_SUCCESS;
cleanup:
OBJ_DESTRUCT(&buffer);
OBJ_RELEASE(buffer);
return exit_status;
}