Skip to content

Commit

Permalink
Merge pull request #451 from dash-project/feat-dart-handlefree
Browse files Browse the repository at this point in the history
Improvements to dash::Future
  • Loading branch information
devreal authored Feb 14, 2018
2 parents 3ac7495 + c8014b5 commit 8b2d4b3
Show file tree
Hide file tree
Showing 5 changed files with 555 additions and 448 deletions.
47 changes: 47 additions & 0 deletions dart-if/include/dash/dart/if/dart_communication.h
Original file line number Diff line number Diff line change
Expand Up @@ -598,6 +598,23 @@ dart_ret_t dart_test_local(
dart_handle_t * handle,
int32_t * result) DART_NOTHROW;

/**
* Test for the completion of an operation and ensure remote completion.
* If the transfer completed, the handle is invalidated and may not be used
* in another \c dart_wait or \c dart_test operation.
*
* \param handle The handle of an operation to test for completion.
* \param[out] result \c True if the operation has completed.
*
* \return \c DART_OK on success, any other of \ref dart_ret_t otherwise.
*
* \threadsafe
* \ingroup DartCommunication
*/
dart_ret_t dart_test(
dart_handle_t * handleptr,
int32_t * is_finished);

/**
* Test for the local completion of operations.
* If the transfers completed, the handles are invalidated and may not be
Expand All @@ -617,6 +634,36 @@ dart_ret_t dart_testall_local(
size_t n,
int32_t * result) DART_NOTHROW;

/**
* Test for the completion of operations and ensure remote completion.
* If the transfers completed, the handles are invalidated and may not be
* used in another \c dart_wait or \c dart_test operation.
*
* \param handles Array of handles of operations to test for completion.
* \param n Number of \c handles to test for completion.
* \param[out] result \c True if all operations have completed.
*
* \return \c DART_OK on success, any other of \ref dart_ret_t otherwise.
*
* \threadsafe
* \ingroup DartCommunication
*/
dart_ret_t dart_testall(
dart_handle_t handles[],
size_t n,
int32_t * is_finished);

/**
* Free the handle without testing or waiting for completion of the operation.
*
* \param handle Pointer to the handle to free.
*
* \return \c DART_OK on success, any other of \ref dart_ret_t otherwise.
*
*/
dart_ret_t dart_handle_free(
dart_handle_t * handle) DART_NOTHROW;

/** \} */

/**
Expand Down
205 changes: 177 additions & 28 deletions dart-impl/mpi/src/dart_communication.c
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@
* Temporary space allocation:
* - on the stack for allocations <=64B
* - on the heap otherwise
* Mainly meant to be used in dart_waitall_* and dart_testall_local
* Mainly meant to be used in dart_waitall* and dart_testall*
*/
#define ALLOC_TMP(__size) ((__size)<=64) ? alloca((__size)) : malloc((__size))
/**
Expand Down Expand Up @@ -172,8 +172,8 @@ dart__mpi__get(
{
if (reqs != NULL) {
return MPI_Rget(origin_addr, origin_count, origin_datatype,
target_rank, target_disp, target_count, target_datatype,
win, &reqs[(*num_reqs)++]);
target_rank, target_disp, target_count, target_datatype,
win, &reqs[(*num_reqs)++]);
} else {
return MPI_Get(origin_addr, origin_count, origin_datatype,
target_rank, target_disp, target_count,
Expand All @@ -191,8 +191,8 @@ dart__mpi__put(
{
if (reqs != NULL) {
return MPI_Rput(origin_addr, origin_count, origin_datatype,
target_rank, target_disp, target_count, target_datatype,
win, &reqs[(*num_reqs)++]);
target_rank, target_disp, target_count, target_datatype,
win, &reqs[(*num_reqs)++]);
} else {
return MPI_Put(origin_addr, origin_count, origin_datatype,
target_rank, target_disp, target_count,
Expand Down Expand Up @@ -609,7 +609,7 @@ dart_ret_t dart_accumulate(

CHECK_UNITID_RANGE(team_unit_id, team_data);

DART_LOG_DEBUG("dart_accumulate() nelem:%zu dtype:%d op:%d unit:%d",
DART_LOG_DEBUG("dart_accumulate() nelem:%zu dtype:%ld op:%d unit:%d",
nelem, dtype, op, team_unit_id.id);

dart_segment_info_t *seginfo = dart_segment_get_info(
Expand Down Expand Up @@ -703,7 +703,7 @@ dart_ret_t dart_fetch_and_op(

CHECK_UNITID_RANGE(team_unit_id, team_data);

DART_LOG_DEBUG("dart_fetch_and_op() dtype:%d op:%d unit:%d "
DART_LOG_DEBUG("dart_fetch_and_op() dtype:%ld op:%d unit:%d "
"offset:%"PRIu64" segid:%d",
dtype, op, team_unit_id.id,
gptr.addr_or_offs.offset, seg_id);
Expand Down Expand Up @@ -755,7 +755,7 @@ dart_ret_t dart_compare_and_swap(

CHECK_UNITID_RANGE(team_unit_id, team_data);

DART_LOG_TRACE("dart_compare_and_swap() dtype:%d unit:%d offset:%"PRIu64,
DART_LOG_TRACE("dart_compare_and_swap() dtype:%ld unit:%d offset:%"PRIu64,
dtype, team_unit_id.id, gptr.addr_or_offs.offset);

dart_segment_info_t *seginfo = dart_segment_get_info(
Expand Down Expand Up @@ -1372,6 +1372,27 @@ dart_ret_t dart_waitall_local(
return ret;
}

static
dart_ret_t wait_remote_completion(
dart_handle_t *handles,
size_t n
)
{
for (size_t i = 0; i < n; i++) {
if (handles[i] != DART_HANDLE_NULL && handles[i]->needs_flush) {
DART_LOG_DEBUG("dart_waitall: -- MPI_Win_flush(handle[%zu]: %p, dest: %d))",
i, (void*)handles[i], handles[i]->dest);
/*
* MPI_Win_flush to wait for remote completion if required:
*/
if (MPI_Win_flush(handles[i]->dest, handles[i]->win) != MPI_SUCCESS) {
return DART_ERR_INVAL;
}
}
}
return DART_OK;
}

dart_ret_t dart_waitall(
dart_handle_t handles[],
size_t n)
Expand Down Expand Up @@ -1444,19 +1465,10 @@ dart_ret_t dart_waitall(
* wait for completion of MPI requests at origins and targets:
*/
DART_LOG_DEBUG("dart_waitall: waiting for remote completion");
for (size_t i = 0; i < n; i++) {
if (handles[i] != DART_HANDLE_NULL && handles[i]->needs_flush) {
DART_LOG_DEBUG("dart_waitall: -- MPI_Win_flush(handle[%zu]: %p, dest: %d))",
i, (void*)handles[i], handles[i]->dest);
/*
* MPI_Win_flush to wait for remote completion if required:
*/
if (MPI_Win_flush(handles[i]->dest, handles[i]->win) != MPI_SUCCESS) {
DART_LOG_ERROR("dart_waitall: MPI_Win_flush failed");
FREE_TMP(2 * n * sizeof(MPI_Request), mpi_req);
return DART_ERR_INVAL;
}
}
if (DART_OK != wait_remote_completion(handles, n)) {
DART_LOG_ERROR("dart_waitall: MPI_Win_flush failed");
FREE_TMP(2 * n * sizeof(MPI_Request), mpi_req);
return DART_ERR_OTHER;
}

/*
Expand All @@ -1480,6 +1492,39 @@ dart_ret_t dart_waitall(
return DART_OK;
}

/**
* Wrapper around MPI_Testall to account for broken MPICH implementation.
* MPICH <= 3.2.1 and its derivatives seem to be affected
*/
inline static
int
dart__mpi__testall(int num_reqs, MPI_Request *reqs, int *flag_ptr)
{
#if defined(MPICH_NUMVERSION) && MPICH_NUMVERSION <= 30201300
int flag_result = 1;
for (int i = 0; i < num_reqs; ++i) {
int flag;
/*
* if the test succeeds the request is set to MPI_REQUEST_NULL,
* which can be safely passed to MPI_Test again.
* Eventually we will have all requests tested succesfully.
*/
int ret = MPI_Test(&reqs[i], &flag, MPI_STATUS_IGNORE);
if (ret != MPI_SUCCESS) {
return ret;
}
// one incomplete request will flip the flag to 0
flag_result &= flag;
}
*flag_ptr = flag_result;
// we checked all requests succesfully
return MPI_SUCCESS;
#else
return MPI_Testall(num_reqs, reqs,
flag_ptr, MPI_STATUSES_IGNORE);
#endif //defined(MPICH_NUMVERSION) && MPICH_NUMVERSION <= 30201300
}

dart_ret_t dart_test_local(
dart_handle_t * handleptr,
int32_t * is_finished)
Expand All @@ -1496,11 +1541,9 @@ dart_ret_t dart_test_local(
*is_finished = 0;

dart_handle_t handle = *handleptr;
if (MPI_Testall(handle->num_reqs, handle->reqs,
&flag, MPI_STATUSES_IGNORE) != MPI_SUCCESS) {
DART_LOG_ERROR("dart_test_local: MPI_Test failed!");
return DART_ERR_OTHER;
}
CHECK_MPI_RET(
dart__mpi__testall(handle->num_reqs, handle->reqs, &flag),
"MPI_Testall");

if (flag) {
// deallocate handle
Expand All @@ -1512,6 +1555,43 @@ dart_ret_t dart_test_local(
return DART_OK;
}


dart_ret_t dart_test(
dart_handle_t * handleptr,
int32_t * is_finished)
{
int flag;

DART_LOG_DEBUG("dart_test()");
if (handleptr == NULL ||
*handleptr == DART_HANDLE_NULL ||
(*handleptr)->num_reqs == 0) {
*is_finished = 1;
return DART_OK;
}
*is_finished = 0;

dart_handle_t handle = *handleptr;
CHECK_MPI_RET(
dart__mpi__testall(handle->num_reqs, handle->reqs, &flag),
"MPI_Testall");

if (flag) {
if (handle->needs_flush) {
CHECK_MPI_RET(
MPI_Win_flush(handle->dest, handle->win),
"MPI_Win_flush"
);
}
// deallocate handle
free(handle);
*handleptr = DART_HANDLE_NULL;
*is_finished = 1;
}
DART_LOG_DEBUG("dart_test > finished");
return DART_OK;
}

dart_ret_t dart_testall_local(
dart_handle_t handles[],
size_t n,
Expand Down Expand Up @@ -1541,8 +1621,7 @@ dart_ret_t dart_testall_local(
}

if (r_n) {
if (MPI_Testall(r_n, mpi_req, &flag,
MPI_STATUSES_IGNORE) != MPI_SUCCESS){
if (dart__mpi__testall(r_n, mpi_req, &flag) != MPI_SUCCESS){
FREE_TMP(2 * n * sizeof(MPI_Request), mpi_req);
DART_LOG_ERROR("dart_testall_local: MPI_Testall failed!");
return DART_ERR_OTHER;
Expand All @@ -1566,6 +1645,76 @@ dart_ret_t dart_testall_local(
return DART_OK;
}


dart_ret_t dart_testall(
dart_handle_t handles[],
size_t n,
int32_t * is_finished)
{
DART_LOG_DEBUG("dart_testall_local()");
if (handles == NULL || n == 0) {
DART_LOG_DEBUG("dart_testall_local: empty handles");
return DART_OK;
}

MPI_Request *mpi_req = ALLOC_TMP(2 * n * sizeof (MPI_Request));
size_t r_n = 0;
for (size_t i = 0; i < n; ++i) {
if (handles[i] != DART_HANDLE_NULL) {
for (uint8_t j = 0; j < handles[i]->num_reqs; ++j) {
if (handles[i]->reqs[j] != MPI_REQUEST_NULL){
mpi_req[r_n] = handles[i]->reqs[j];
++r_n;
}
}
}
}

if (r_n) {
DART_LOG_TRACE(" MPI_Testall on %zu requests", r_n);
if (dart__mpi__testall(r_n, mpi_req, is_finished) != MPI_SUCCESS){
DART_LOG_ERROR("dart_testall: MPI_Testall failed");
FREE_TMP(2 * n * sizeof(MPI_Request), mpi_req);
return DART_ERR_OTHER;
}

if (*is_finished) {
/*
* wait for completion of MPI requests at origins and targets:
*/
DART_LOG_DEBUG("dart_testall: waiting for remote completion");
if (DART_OK != wait_remote_completion(handles, n)) {
DART_LOG_ERROR("dart_testall: MPI_Win_flush failed");
FREE_TMP(2 * n * sizeof(MPI_Request), mpi_req);
return DART_ERR_OTHER;
}

for (size_t i = 0; i < n; i++) {
if (handles[i] != DART_HANDLE_NULL) {
// free the handle
free(handles[i]);
handles[i] = DART_HANDLE_NULL;
}
}
}
} else {
*is_finished = 1;
}
FREE_TMP(2 * n * sizeof(MPI_Request), mpi_req);
DART_LOG_DEBUG("dart_testall_local > finished");
return DART_OK;
}

dart_ret_t dart_handle_free(
dart_handle_t * handleptr)
{
if (handleptr != NULL && *handleptr != DART_HANDLE_NULL) {
free(*handleptr);
*handleptr = DART_HANDLE_NULL;
}
return DART_OK;
}

/* -- Dart collective operations -- */

static int _dart_barrier_count = 0;
Expand Down
Loading

0 comments on commit 8b2d4b3

Please sign in to comment.