diff --git a/orte/mca/gpr/replica/functional_layer/gpr_replica_fn.h b/orte/mca/gpr/replica/functional_layer/gpr_replica_fn.h index 04223b61f8..41c0486bc2 100644 --- a/orte/mca/gpr/replica/functional_layer/gpr_replica_fn.h +++ b/orte/mca/gpr/replica/functional_layer/gpr_replica_fn.h @@ -155,7 +155,8 @@ int orte_gpr_replica_add_keyval(orte_gpr_replica_itagval_t **ivalptr, orte_gpr_replica_container_t *cptr, orte_gpr_keyval_t *kptr); -int orte_gpr_replica_update_keyval(orte_gpr_replica_segment_t *seg, +int orte_gpr_replica_update_keyval(orte_gpr_replica_itagval_t **iptr, + orte_gpr_replica_segment_t *seg, orte_gpr_replica_container_t *cptr, orte_gpr_keyval_t *kptr); diff --git a/orte/mca/gpr/replica/functional_layer/gpr_replica_put_get_fn.c b/orte/mca/gpr/replica/functional_layer/gpr_replica_put_get_fn.c index aad0f181c9..1b8028ab62 100644 --- a/orte/mca/gpr/replica/functional_layer/gpr_replica_put_get_fn.c +++ b/orte/mca/gpr/replica/functional_layer/gpr_replica_put_get_fn.c @@ -129,12 +129,11 @@ int orte_gpr_replica_put_fn(orte_gpr_addr_mode_t addr_mode, orte_gpr_replica_container_t **cptr, *cptr2; orte_gpr_replica_itag_t itag; orte_gpr_replica_addr_mode_t tok_mode; - orte_gpr_keyval_t **kptr; - orte_gpr_replica_itagval_t *iptr; - bool overwrite; + orte_gpr_replica_itagval_t *iptr, **iptrs; + bool overwrite, overwritten; char *tmp=NULL; int rc; - size_t i, j, k; + size_t i, j, k, m, n, index; if (orte_gpr_replica_globals.debug) { opal_output(0, "[%lu,%lu,%lu] gpr replica: put entered on segment %s\nValues:", @@ -152,6 +151,8 @@ int orte_gpr_replica_put_fn(orte_gpr_addr_mode_t addr_mode, /* initialize storage for actions taken */ orte_pointer_array_clear(orte_gpr_replica_globals.acted_upon); orte_gpr_replica_globals.num_acted_upon = 0; + orte_pointer_array_clear(orte_gpr_replica_globals.overwritten); + orte_gpr_replica_globals.num_overwritten = 0; /* extract the token address mode and overwrite permissions */ overwrite = false; @@ -183,7 +184,6 @@ int orte_gpr_replica_put_fn(orte_gpr_addr_mode_t addr_mode, } /* ok, store all the keyvals in the container */ - kptr = keyvals; for (i=0; i < cnt; i++) { if (ORTE_SUCCESS != (rc = orte_gpr_replica_add_keyval(&iptr, seg, cptr2, keyvals[i]))) { ORTE_ERROR_LOG(rc); @@ -199,6 +199,7 @@ int orte_gpr_replica_put_fn(orte_gpr_addr_mode_t addr_mode, } else { /* otherwise, go through list of containers. For each one, see if entry already exists in container - overwrite if allowed */ cptr = (orte_gpr_replica_container_t**)(orte_gpr_replica_globals.srch_cptr)->addr; + iptrs = (orte_gpr_replica_itagval_t**)(orte_gpr_replica_globals.overwritten)->addr; for (j=0, k=0; k < orte_gpr_replica_globals.num_srch_cptr && j < (orte_gpr_replica_globals.srch_cptr)->size; j++) { if (NULL != cptr[j]) { @@ -213,14 +214,44 @@ int orte_gpr_replica_put_fn(orte_gpr_addr_mode_t addr_mode, * else add this keyval to the container as a new entry */ if (overwrite) { - if (ORTE_SUCCESS != (rc = orte_gpr_replica_update_keyval(seg, cptr[j], keyvals[i]))) { - return rc; - } - /* action is recorded in update function - don't do it here */ - /* turn off the overwrite flag so that any subsequent entries are - * added - otherwise, only the last value provided would be retained! + /* check to see if we have already overwritten this keyval. if so, + * then we add the remaining values - otherwise, only the + * last value provided would be retained! */ - overwrite = false; + overwritten = false; + for (m=0, n=0; !overwritten && + n < orte_gpr_replica_globals.num_overwritten && + m < (orte_gpr_replica_globals.overwritten)->size; m++) { + if (NULL != iptrs[m]) { + n++; + if (iptrs[m]->itag == itag) { + /* keyval was previously overwritten */ + if (ORTE_SUCCESS != (rc = orte_gpr_replica_add_keyval(&iptr, seg, cptr[j], keyvals[i]))) { + return rc; + } + /* record that we did this */ + if (ORTE_SUCCESS != (rc = orte_gpr_replica_record_action(seg, cptr[j], iptr, ORTE_GPR_REPLICA_ENTRY_CHANGED))) { + ORTE_ERROR_LOG(rc); + return rc; + } + overwritten = true; + } + } + } + if (!overwritten) { + /* must not have been previously overwritten - go + * ahead and overwrite it now + */ + if (ORTE_SUCCESS != (rc = orte_gpr_replica_update_keyval(&iptr, seg, cptr[j], keyvals[i]))) { + return rc; + } + /* record the ival so we don't do it again */ + if (0 > orte_pointer_array_add(&index, orte_gpr_replica_globals.overwritten, (void*)iptr)) { + ORTE_ERROR_LOG(ORTE_ERR_OUT_OF_RESOURCE); + return ORTE_ERR_OUT_OF_RESOURCE; + } + (orte_gpr_replica_globals.num_overwritten)++; + } } else { if (ORTE_SUCCESS != (rc = orte_gpr_replica_add_keyval(&iptr, seg, cptr[j], keyvals[i]))) { return rc; diff --git a/orte/mca/gpr/replica/functional_layer/gpr_replica_segment_fn.c b/orte/mca/gpr/replica/functional_layer/gpr_replica_segment_fn.c index f63e5bfc26..10e61a63c3 100644 --- a/orte/mca/gpr/replica/functional_layer/gpr_replica_segment_fn.c +++ b/orte/mca/gpr/replica/functional_layer/gpr_replica_segment_fn.c @@ -194,6 +194,17 @@ int orte_gpr_replica_delete_itagval(orte_gpr_replica_segment_t *seg, return rc; } + /* remove the itag value from the container's list */ + for (i=0; i < orte_value_array_get_size(&(cptr->itaglist)); i++) { + if (iptr->itag == ORTE_VALUE_ARRAY_GET_ITEM(&(cptr->itaglist), orte_gpr_replica_itag_t, i)) { + orte_value_array_remove_item(&(cptr->itaglist), i); + goto MOVEON; + } + } + ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND); + return ORTE_ERR_NOT_FOUND; + +MOVEON: /* release the data storage */ i = iptr->index; OBJ_RELEASE(iptr); @@ -206,35 +217,45 @@ int orte_gpr_replica_delete_itagval(orte_gpr_replica_segment_t *seg, } -int orte_gpr_replica_update_keyval(orte_gpr_replica_segment_t *seg, +int orte_gpr_replica_update_keyval(orte_gpr_replica_itagval_t **iptr2, + orte_gpr_replica_segment_t *seg, orte_gpr_replica_container_t *cptr, orte_gpr_keyval_t *kptr) { - size_t i; + size_t i, j, k; int rc; orte_pointer_array_t *ptr; orte_gpr_replica_itagval_t *iptr; ptr = orte_gpr_replica_globals.srch_ival; + /* record the error value */ + *iptr2 = NULL; + /* for each item in the search array, delete it */ for (i=0; i < ptr->size; i++) { if (NULL != ptr->addr[i]) { iptr = (orte_gpr_replica_itagval_t*)ptr->addr[i]; /* release the data storage */ - i = iptr->index; - if (ORTE_SUCCESS != (rc = orte_gpr_replica_record_action(seg, cptr, iptr, - ORTE_GPR_REPLICA_ENTRY_CHANGED | - ORTE_GPR_REPLICA_ENTRY_CHG_FRM))) { - ORTE_ERROR_LOG(rc); - return rc; - } - /* MUST DO THE RELEASE AFTER RECORDING THE ACTION SO THAT - * THE OBJECT DOESN'T ACTUALLY LEAVE UNTIL THE ACTION IS PROCESSED + j = iptr->index; + /* DON'T RECORD THE ACTION - THIS WILL PREVENT US FROM SENDING + * BOTH THE OLD AND THE NEW DATA BACK ON A SUBSCRIPTION + * REQUEST */ + /* remove the itag value from the container's list */ + for (k=0; k < orte_value_array_get_size(&(cptr->itaglist)); k++) { + if (iptr->itag == ORTE_VALUE_ARRAY_GET_ITEM(&(cptr->itaglist), orte_gpr_replica_itag_t, k)) { + orte_value_array_remove_item(&(cptr->itaglist), k); + goto MOVEON; + } + } + ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND); + return ORTE_ERR_NOT_FOUND; + +MOVEON: OBJ_RELEASE(iptr); /* remove the entry from the container's itagval array */ - orte_pointer_array_set_item(cptr->itagvals, i, NULL); + orte_pointer_array_set_item(cptr->itagvals, j, NULL); (cptr->num_itagvals)--; } } @@ -260,6 +281,9 @@ int orte_gpr_replica_update_keyval(orte_gpr_replica_segment_t *seg, return rc; } + /* return the location of the new iptr */ + *iptr2 = iptr; + return ORTE_SUCCESS; } diff --git a/orte/mca/gpr/replica/gpr_replica.h b/orte/mca/gpr/replica/gpr_replica.h index 56cf344af3..85d7cca0dc 100644 --- a/orte/mca/gpr/replica/gpr_replica.h +++ b/orte/mca/gpr/replica/gpr_replica.h @@ -102,6 +102,8 @@ typedef struct { size_t trig_cntr; size_t num_srch_cptr; orte_pointer_array_t *srch_cptr; + size_t num_overwritten; + orte_pointer_array_t *overwritten; orte_pointer_array_t *sub_ptrs; size_t num_srch_ival; orte_pointer_array_t *srch_ival; diff --git a/orte/mca/gpr/replica/gpr_replica_component.c b/orte/mca/gpr/replica/gpr_replica_component.c index 509d94ba8a..be8562719b 100644 --- a/orte/mca/gpr/replica/gpr_replica_component.c +++ b/orte/mca/gpr/replica/gpr_replica_component.c @@ -244,6 +244,13 @@ orte_gpr_base_module_t *orte_gpr_replica_init(bool *allow_multi_user_threads, bo } orte_gpr_replica_globals.num_srch_cptr = 0; + if (ORTE_SUCCESS != (rc = orte_pointer_array_init(&(orte_gpr_replica_globals.overwritten), + 20, orte_gpr_replica_globals.max_size, 20))) { + ORTE_ERROR_LOG(rc); + return NULL; + } + orte_gpr_replica_globals.num_overwritten = 0; + if (ORTE_SUCCESS != (rc = orte_pointer_array_init(&(orte_gpr_replica_globals.srch_ival), 100, orte_gpr_replica_globals.max_size, 100))) { ORTE_ERROR_LOG(rc); @@ -337,6 +344,10 @@ int orte_gpr_replica_finalize(void) OBJ_RELEASE(orte_gpr_replica_globals.srch_cptr); } + if (NULL != orte_gpr_replica_globals.overwritten) { + OBJ_RELEASE(orte_gpr_replica_globals.overwritten); + } + if (NULL != orte_gpr_replica_globals.srch_ival) { OBJ_RELEASE(orte_gpr_replica_globals.srch_ival); } diff --git a/orte/tools/orted/orted.c b/orte/tools/orted/orted.c index df6ccff72e..97bd476965 100644 --- a/orte/tools/orted/orted.c +++ b/orte/tools/orted/orted.c @@ -171,7 +171,7 @@ int main(int argc, char *argv[]) memset(&orted_globals, 0, sizeof(orted_globals_t)); cmd_line = OBJ_NEW(opal_cmd_line_t); opal_cmd_line_create(cmd_line, orte_cmd_line_opts); - if (OMPI_SUCCESS != (ret = opal_cmd_line_parse(cmd_line, true, + if (OMPI_SUCCESS != (ret = opal_cmd_line_parse(cmd_line, false, argc, argv))) { return ret; }