diff --git a/dart-if/include/dash/dart/if/dart_communication.h b/dart-if/include/dash/dart/if/dart_communication.h index 3987ae083..23e7b96a6 100644 --- a/dart-if/include/dash/dart/if/dart_communication.h +++ b/dart-if/include/dash/dart/if/dart_communication.h @@ -598,6 +598,23 @@ dart_ret_t dart_test_local( dart_handle_t * handle, int32_t * result) DART_NOTHROW; +/** + * Test for the completion of an operation and ensure remote completion. + * If the transfer completed, the handle is invalidated and may not be used + * in another \c dart_wait or \c dart_test operation. + * + * \param handle The handle of an operation to test for completion. + * \param[out] result \c True if the operation has completed. + * + * \return \c DART_OK on success, any other of \ref dart_ret_t otherwise. + * + * \threadsafe + * \ingroup DartCommunication + */ +dart_ret_t dart_test( + dart_handle_t * handleptr, + int32_t * is_finished); + /** * Test for the local completion of operations. * If the transfers completed, the handles are invalidated and may not be @@ -617,6 +634,36 @@ dart_ret_t dart_testall_local( size_t n, int32_t * result) DART_NOTHROW; +/** + * Test for the completion of operations and ensure remote completion. + * If the transfers completed, the handles are invalidated and may not be + * used in another \c dart_wait or \c dart_test operation. + * + * \param handles Array of handles of operations to test for completion. + * \param n Number of \c handles to test for completion. + * \param[out] result \c True if all operations have completed. + * + * \return \c DART_OK on success, any other of \ref dart_ret_t otherwise. + * + * \threadsafe + * \ingroup DartCommunication + */ +dart_ret_t dart_testall( + dart_handle_t handles[], + size_t n, + int32_t * is_finished); + +/** + * Free the handle without testing or waiting for completion of the operation. + * + * \param handle Pointer to the handle to free. + * + * \return \c DART_OK on success, any other of \ref dart_ret_t otherwise. + * + */ +dart_ret_t dart_handle_free( + dart_handle_t * handle) DART_NOTHROW; + /** \} */ /** diff --git a/dart-impl/mpi/src/dart_communication.c b/dart-impl/mpi/src/dart_communication.c index 4fad4ada9..c8a923062 100644 --- a/dart-impl/mpi/src/dart_communication.c +++ b/dart-impl/mpi/src/dart_communication.c @@ -77,7 +77,7 @@ * Temporary space allocation: * - on the stack for allocations <=64B * - on the heap otherwise - * Mainly meant to be used in dart_waitall_* and dart_testall_local + * Mainly meant to be used in dart_waitall* and dart_testall* */ #define ALLOC_TMP(__size) ((__size)<=64) ? alloca((__size)) : malloc((__size)) /** @@ -172,8 +172,8 @@ dart__mpi__get( { if (reqs != NULL) { return MPI_Rget(origin_addr, origin_count, origin_datatype, - target_rank, target_disp, target_count, target_datatype, - win, &reqs[(*num_reqs)++]); + target_rank, target_disp, target_count, target_datatype, + win, &reqs[(*num_reqs)++]); } else { return MPI_Get(origin_addr, origin_count, origin_datatype, target_rank, target_disp, target_count, @@ -191,8 +191,8 @@ dart__mpi__put( { if (reqs != NULL) { return MPI_Rput(origin_addr, origin_count, origin_datatype, - target_rank, target_disp, target_count, target_datatype, - win, &reqs[(*num_reqs)++]); + target_rank, target_disp, target_count, target_datatype, + win, &reqs[(*num_reqs)++]); } else { return MPI_Put(origin_addr, origin_count, origin_datatype, target_rank, target_disp, target_count, @@ -609,7 +609,7 @@ dart_ret_t dart_accumulate( CHECK_UNITID_RANGE(team_unit_id, team_data); - DART_LOG_DEBUG("dart_accumulate() nelem:%zu dtype:%d op:%d unit:%d", + DART_LOG_DEBUG("dart_accumulate() nelem:%zu dtype:%ld op:%d unit:%d", nelem, dtype, op, team_unit_id.id); dart_segment_info_t *seginfo = dart_segment_get_info( @@ -703,7 +703,7 @@ dart_ret_t dart_fetch_and_op( CHECK_UNITID_RANGE(team_unit_id, team_data); - DART_LOG_DEBUG("dart_fetch_and_op() dtype:%d op:%d unit:%d " + DART_LOG_DEBUG("dart_fetch_and_op() dtype:%ld op:%d unit:%d " "offset:%"PRIu64" segid:%d", dtype, op, team_unit_id.id, gptr.addr_or_offs.offset, seg_id); @@ -755,7 +755,7 @@ dart_ret_t dart_compare_and_swap( CHECK_UNITID_RANGE(team_unit_id, team_data); - DART_LOG_TRACE("dart_compare_and_swap() dtype:%d unit:%d offset:%"PRIu64, + DART_LOG_TRACE("dart_compare_and_swap() dtype:%ld unit:%d offset:%"PRIu64, dtype, team_unit_id.id, gptr.addr_or_offs.offset); dart_segment_info_t *seginfo = dart_segment_get_info( @@ -1372,6 +1372,27 @@ dart_ret_t dart_waitall_local( return ret; } +static +dart_ret_t wait_remote_completion( + dart_handle_t *handles, + size_t n +) +{ + for (size_t i = 0; i < n; i++) { + if (handles[i] != DART_HANDLE_NULL && handles[i]->needs_flush) { + DART_LOG_DEBUG("dart_waitall: -- MPI_Win_flush(handle[%zu]: %p, dest: %d))", + i, (void*)handles[i], handles[i]->dest); + /* + * MPI_Win_flush to wait for remote completion if required: + */ + if (MPI_Win_flush(handles[i]->dest, handles[i]->win) != MPI_SUCCESS) { + return DART_ERR_INVAL; + } + } + } + return DART_OK; +} + dart_ret_t dart_waitall( dart_handle_t handles[], size_t n) @@ -1444,19 +1465,10 @@ dart_ret_t dart_waitall( * wait for completion of MPI requests at origins and targets: */ DART_LOG_DEBUG("dart_waitall: waiting for remote completion"); - for (size_t i = 0; i < n; i++) { - if (handles[i] != DART_HANDLE_NULL && handles[i]->needs_flush) { - DART_LOG_DEBUG("dart_waitall: -- MPI_Win_flush(handle[%zu]: %p, dest: %d))", - i, (void*)handles[i], handles[i]->dest); - /* - * MPI_Win_flush to wait for remote completion if required: - */ - if (MPI_Win_flush(handles[i]->dest, handles[i]->win) != MPI_SUCCESS) { - DART_LOG_ERROR("dart_waitall: MPI_Win_flush failed"); - FREE_TMP(2 * n * sizeof(MPI_Request), mpi_req); - return DART_ERR_INVAL; - } - } + if (DART_OK != wait_remote_completion(handles, n)) { + DART_LOG_ERROR("dart_waitall: MPI_Win_flush failed"); + FREE_TMP(2 * n * sizeof(MPI_Request), mpi_req); + return DART_ERR_OTHER; } /* @@ -1480,6 +1492,39 @@ dart_ret_t dart_waitall( return DART_OK; } +/** + * Wrapper around MPI_Testall to account for broken MPICH implementation. + * MPICH <= 3.2.1 and its derivatives seem to be affected + */ +inline static +int +dart__mpi__testall(int num_reqs, MPI_Request *reqs, int *flag_ptr) +{ +#if defined(MPICH_NUMVERSION) && MPICH_NUMVERSION <= 30201300 + int flag_result = 1; + for (int i = 0; i < num_reqs; ++i) { + int flag; + /* + * if the test succeeds the request is set to MPI_REQUEST_NULL, + * which can be safely passed to MPI_Test again. + * Eventually we will have all requests tested succesfully. + */ + int ret = MPI_Test(&reqs[i], &flag, MPI_STATUS_IGNORE); + if (ret != MPI_SUCCESS) { + return ret; + } + // one incomplete request will flip the flag to 0 + flag_result &= flag; + } + *flag_ptr = flag_result; + // we checked all requests succesfully + return MPI_SUCCESS; +#else + return MPI_Testall(num_reqs, reqs, + flag_ptr, MPI_STATUSES_IGNORE); +#endif //defined(MPICH_NUMVERSION) && MPICH_NUMVERSION <= 30201300 +} + dart_ret_t dart_test_local( dart_handle_t * handleptr, int32_t * is_finished) @@ -1496,11 +1541,9 @@ dart_ret_t dart_test_local( *is_finished = 0; dart_handle_t handle = *handleptr; - if (MPI_Testall(handle->num_reqs, handle->reqs, - &flag, MPI_STATUSES_IGNORE) != MPI_SUCCESS) { - DART_LOG_ERROR("dart_test_local: MPI_Test failed!"); - return DART_ERR_OTHER; - } + CHECK_MPI_RET( + dart__mpi__testall(handle->num_reqs, handle->reqs, &flag), + "MPI_Testall"); if (flag) { // deallocate handle @@ -1512,6 +1555,43 @@ dart_ret_t dart_test_local( return DART_OK; } + +dart_ret_t dart_test( + dart_handle_t * handleptr, + int32_t * is_finished) +{ + int flag; + + DART_LOG_DEBUG("dart_test()"); + if (handleptr == NULL || + *handleptr == DART_HANDLE_NULL || + (*handleptr)->num_reqs == 0) { + *is_finished = 1; + return DART_OK; + } + *is_finished = 0; + + dart_handle_t handle = *handleptr; + CHECK_MPI_RET( + dart__mpi__testall(handle->num_reqs, handle->reqs, &flag), + "MPI_Testall"); + + if (flag) { + if (handle->needs_flush) { + CHECK_MPI_RET( + MPI_Win_flush(handle->dest, handle->win), + "MPI_Win_flush" + ); + } + // deallocate handle + free(handle); + *handleptr = DART_HANDLE_NULL; + *is_finished = 1; + } + DART_LOG_DEBUG("dart_test > finished"); + return DART_OK; +} + dart_ret_t dart_testall_local( dart_handle_t handles[], size_t n, @@ -1541,8 +1621,7 @@ dart_ret_t dart_testall_local( } if (r_n) { - if (MPI_Testall(r_n, mpi_req, &flag, - MPI_STATUSES_IGNORE) != MPI_SUCCESS){ + if (dart__mpi__testall(r_n, mpi_req, &flag) != MPI_SUCCESS){ FREE_TMP(2 * n * sizeof(MPI_Request), mpi_req); DART_LOG_ERROR("dart_testall_local: MPI_Testall failed!"); return DART_ERR_OTHER; @@ -1566,6 +1645,76 @@ dart_ret_t dart_testall_local( return DART_OK; } + +dart_ret_t dart_testall( + dart_handle_t handles[], + size_t n, + int32_t * is_finished) +{ + DART_LOG_DEBUG("dart_testall_local()"); + if (handles == NULL || n == 0) { + DART_LOG_DEBUG("dart_testall_local: empty handles"); + return DART_OK; + } + + MPI_Request *mpi_req = ALLOC_TMP(2 * n * sizeof (MPI_Request)); + size_t r_n = 0; + for (size_t i = 0; i < n; ++i) { + if (handles[i] != DART_HANDLE_NULL) { + for (uint8_t j = 0; j < handles[i]->num_reqs; ++j) { + if (handles[i]->reqs[j] != MPI_REQUEST_NULL){ + mpi_req[r_n] = handles[i]->reqs[j]; + ++r_n; + } + } + } + } + + if (r_n) { + DART_LOG_TRACE(" MPI_Testall on %zu requests", r_n); + if (dart__mpi__testall(r_n, mpi_req, is_finished) != MPI_SUCCESS){ + DART_LOG_ERROR("dart_testall: MPI_Testall failed"); + FREE_TMP(2 * n * sizeof(MPI_Request), mpi_req); + return DART_ERR_OTHER; + } + + if (*is_finished) { + /* + * wait for completion of MPI requests at origins and targets: + */ + DART_LOG_DEBUG("dart_testall: waiting for remote completion"); + if (DART_OK != wait_remote_completion(handles, n)) { + DART_LOG_ERROR("dart_testall: MPI_Win_flush failed"); + FREE_TMP(2 * n * sizeof(MPI_Request), mpi_req); + return DART_ERR_OTHER; + } + + for (size_t i = 0; i < n; i++) { + if (handles[i] != DART_HANDLE_NULL) { + // free the handle + free(handles[i]); + handles[i] = DART_HANDLE_NULL; + } + } + } + } else { + *is_finished = 1; + } + FREE_TMP(2 * n * sizeof(MPI_Request), mpi_req); + DART_LOG_DEBUG("dart_testall_local > finished"); + return DART_OK; +} + +dart_ret_t dart_handle_free( + dart_handle_t * handleptr) +{ + if (handleptr != NULL && *handleptr != DART_HANDLE_NULL) { + free(*handleptr); + *handleptr = DART_HANDLE_NULL; + } + return DART_OK; +} + /* -- Dart collective operations -- */ static int _dart_barrier_count = 0; diff --git a/dash/include/dash/Future.h b/dash/include/dash/Future.h index ab796ba25..e0e903da6 100644 --- a/dash/include/dash/Future.h +++ b/dash/include/dash/Future.h @@ -16,14 +16,17 @@ template class Future { private: - typedef Future self_t; - typedef std::function func_t; + typedef Future self_t; + typedef std::function get_func_t; + typedef std::function test_func_t; + typedef std::function destroy_func_t; private: - func_t _func; - ResultT _value; - bool _ready = false; - bool _has_func = false; + get_func_t _get_func; + test_func_t _test_func; + destroy_func_t _destroy_func; + ResultT _value; + bool _ready = false; public: // For ostream output @@ -33,55 +36,71 @@ class Future const Future & future); public: + Future() - : _ready(false), - _has_func(false) + : _ready(false) + { } + + Future(ResultT & result) + : _value(result), + _ready(true) { } - Future(const func_t & func) - : _func(func), - _ready(false), - _has_func(true) + Future(const get_func_t & func) + : _get_func(func) { } Future( - const self_t & other) - : _func(other._func), - _value(other._value), - _ready(other._ready), - _has_func(other._has_func) + const get_func_t & get_func, + const test_func_t & test_func) + : _get_func(get_func), + _test_func(test_func) { } - Future & operator=(const self_t & other) - { - if (this != &other) { - _func = other._func; - _value = other._value; - _ready = other._ready; - _has_func = other._has_func; + Future( + const get_func_t & get_func, + const test_func_t & test_func, + const destroy_func_t & destroy_func) + : _get_func(get_func), + _test_func(test_func), + _destroy_func(destroy_func) + { } + + Future(const self_t& other) = delete; + Future(self_t&& other) = default; + + ~Future() { + if (_destroy_func) { + _destroy_func(); } - return *this; } + /// copy-assignment is not permitted + Future & operator=(const self_t& other) = delete; + Future & operator=(self_t&& other) = default; + void wait() { DASH_LOG_TRACE_VAR("Future.wait()", _ready); if (_ready) { return; } - if (!_has_func) { + if (!_get_func) { DASH_LOG_ERROR("Future.wait()", "No function"); DASH_THROW( dash::exception::RuntimeError, "Future not initialized with function"); } - _value = _func(); + _value = _get_func(); _ready = true; DASH_LOG_TRACE_VAR("Future.wait >", _ready); } - bool test() const + bool test() { + if (!_ready && _test_func) { + _ready = _test_func(&_value); + } return _ready; } diff --git a/dash/include/dash/algorithm/Copy.h b/dash/include/dash/algorithm/Copy.h index 2baff0bd2..f1f7fbf33 100644 --- a/dash/include/dash/algorithm/Copy.h +++ b/dash/include/dash/algorithm/Copy.h @@ -13,12 +13,6 @@ #include #include - -// #ifndef DASH__ALGORITHM__COPY__USE_WAIT -#define DASH__ALGORITHM__COPY__USE_FLUSH -// #define DASH__ALGORITHM__COPY__USE_WAIT -// #endif - namespace dash { #ifdef DOXYGEN @@ -111,9 +105,10 @@ template < typename ValueType, class GlobInputIt > ValueType * copy_impl( - GlobInputIt in_first, - GlobInputIt in_last, - ValueType * out_first) + GlobInputIt in_first, + GlobInputIt in_last, + ValueType * out_first, + std::vector & handles) { DASH_LOG_TRACE("dash::copy_impl()", "in_first:", in_first.pos(), @@ -144,42 +139,24 @@ ValueType * copy_impl( auto unit_last = pattern.unit_at(g_in_last.pos() - 1); DASH_LOG_TRACE_VAR("dash::copy_impl", unit_last); - // MPI uses offset type int, do not copy more than INT_MAX bytes: - size_type max_copy_elem = (std::numeric_limits::max() / - sizeof(ValueType)); size_type num_elem_copied = 0; - DASH_LOG_TRACE_VAR("dash::copy_impl", max_copy_elem); - if (num_elem_total > max_copy_elem) { - DASH_LOG_DEBUG("dash::copy_impl", - "cannot copy", num_elem_total, "elements", - "in a single dart_get operation"); - } if (unit_first == unit_last) { // Input range is located at a single remote unit: DASH_LOG_TRACE("dash::copy_impl", "input range at single unit"); - while (num_elem_copied < num_elem_total) { - // Number of elements left to copy: - auto total_elem_left = num_elem_total - num_elem_copied; - auto num_copy_elem = (num_elem_total > max_copy_elem) - ? max_copy_elem - : num_elem_total; - if (num_copy_elem > total_elem_left) { - num_copy_elem = total_elem_left; - } - DASH_LOG_TRACE("dash::copy_impl", - "copy max:", max_copy_elem, - "get elements:", num_copy_elem, - "total:", num_elem_total, - "copied:", num_elem_copied, - "left:", total_elem_left); - auto cur_in_first = g_in_first + num_elem_copied; - auto cur_out_first = out_first + num_elem_copied; - dash::internal::get_blocking( - cur_in_first.dart_gptr(), - cur_out_first, - num_copy_elem); - num_elem_copied += num_copy_elem; + DASH_LOG_TRACE("dash::copy_impl", + "get elements:", num_elem_total); + auto cur_in_first = g_in_first; + auto cur_out_first = out_first; + dart_handle_t handle; + dash::internal::get_handle( + cur_in_first.dart_gptr(), + cur_out_first, + num_elem_total, + &handle); + if (handle != DART_HANDLE_NULL) { + handles.push_back(handle); } + num_elem_copied = num_elem_total; } else { // Input range is spread over several remote units: DASH_LOG_TRACE("dash::copy_impl", "input range spans multiple units"); @@ -201,9 +178,7 @@ ValueType * copy_impl( // Number of elements left to copy: auto total_elem_left = num_elem_total - num_elem_copied; // Number of elements to copy in this iteration. - auto num_copy_elem = (num_unit_elem < max_copy_elem) - ? num_unit_elem - : max_copy_elem; + auto num_copy_elem = num_unit_elem; if (num_copy_elem > total_elem_left) { num_copy_elem = total_elem_left; } @@ -217,15 +192,18 @@ ValueType * copy_impl( "->", "unit elements:", num_unit_elem, "max elem/unit:", max_elem_per_unit, - "copy max:", max_copy_elem, "get elements:", num_copy_elem, "total:", num_elem_total, "copied:", num_elem_copied, "left:", total_elem_left); auto dest_ptr = out_first + num_elem_copied; auto src_gptr = cur_in_first.dart_gptr(); - dash::internal::get_blocking(src_gptr, dest_ptr, num_copy_elem); + dart_handle_t handle; + dash::internal::get_handle(src_gptr, dest_ptr, num_copy_elem, &handle); num_elem_copied += num_copy_elem; + if (handle != DART_HANDLE_NULL) { + handles.push_back(handle); + } } } @@ -234,211 +212,6 @@ ValueType * copy_impl( return out_last; } -/** - * Asynchronous implementation of \c dash::copy (global to local) without - * optimization for local subrange. - */ -template < - typename ValueType, - class GlobInputIt > -dash::Future copy_async_impl( - GlobInputIt in_first, - GlobInputIt in_last, - ValueType * out_first) -{ - DASH_LOG_TRACE("dash::copy_async_impl()", - "in_first:", in_first.pos(), - "in_last:", in_last.pos(), - "out_first:", out_first); - auto pattern = in_first.pattern(); - typedef typename decltype(pattern)::index_type index_type; - typedef typename decltype(pattern)::size_type size_type; - size_type num_elem_total = dash::distance(in_first, in_last); - if (num_elem_total <= 0) { - DASH_LOG_TRACE("dash::copy_async_impl", "input range empty"); - return dash::Future([=]() { return out_first; }); - } - DASH_LOG_TRACE("dash::copy_async_impl", - "total elements:", num_elem_total, - "expected out_last:", out_first + num_elem_total); - // Input iterators could be relative to a view. Map first input iterator - // to global index range and use it to resolve last input iterator. - // Do not use in_last.global() as this would span over the relative input - // range. - auto g_in_first = in_first.global(); - auto g_in_last = g_in_first + num_elem_total; - DASH_LOG_TRACE("dash::copy_async_impl", - "g_in_first:", g_in_first.pos(), - "g_in_last:", g_in_last.pos()); - auto unit_first = pattern.unit_at(g_in_first.pos()); - DASH_LOG_TRACE_VAR("dash::copy_async_impl", unit_first); - auto unit_last = pattern.unit_at(g_in_last.pos() - 1); - DASH_LOG_TRACE_VAR("dash::copy_async_impl", unit_last); - - // Accessed global pointers to be flushed: -#ifdef DASH__ALGORITHM__COPY__USE_FLUSH - std::vector req_handles; -#else - std::vector req_handles; -#endif - - // MPI uses offset type int, do not copy more than INT_MAX bytes: - size_type max_copy_elem = (std::numeric_limits::max() / - sizeof(ValueType)); - size_type num_elem_copied = 0; - DASH_LOG_TRACE_VAR("dash::copy_async_impl", max_copy_elem); - if (num_elem_total > max_copy_elem) { - DASH_LOG_DEBUG("dash::copy_async_impl", - "cannot copy", num_elem_total, "elements", - "in a single dart_get operation"); - } - if (unit_first == unit_last) { - // Input range is located at a single remote unit: - DASH_LOG_TRACE("dash::copy_async_impl", "input range at single unit"); - while (num_elem_copied < num_elem_total) { - // Number of elements left to copy: - auto total_elem_left = num_elem_total - num_elem_copied; - auto num_copy_elem = (num_elem_total > max_copy_elem) - ? max_copy_elem - : num_elem_total; - if (num_copy_elem > total_elem_left) { - num_copy_elem = total_elem_left; - } - DASH_LOG_TRACE("dash::copy_async_impl", - "copy max:", max_copy_elem, - "get elements:", num_copy_elem, - "total:", num_elem_total, - "copied:", num_elem_copied, - "left:", total_elem_left); - auto cur_in_first = g_in_first + num_elem_copied; - auto cur_out_first = out_first + num_elem_copied; -#ifdef DASH__ALGORITHM__COPY__USE_FLUSH - dash::internal::get( - cur_in_first.dart_gptr(), - cur_out_first, - num_copy_elem); - req_handles.push_back(in_first.dart_gptr()); -#else - dart_handle_t get_handle; - dash::internal::get_handle( - cur_in_first.dart_gptr(), - cur_out_first, - num_copy_elem, - &get_handle); - if (get_handle != DART_HANDLE_NULL) { - req_handles.push_back(get_handle); - } -#endif - num_elem_copied += num_copy_elem; - } - } else { - // Input range is spread over several remote units: - DASH_LOG_TRACE("dash::copy_async_impl", "input range spans multiple units"); - // - // Copy elements from every unit: - // - while (num_elem_copied < num_elem_total) { - // Global iterator pointing at begin of current unit's input range: - auto cur_in_first = g_in_first + num_elem_copied; - // unit and local index of first element in current range segment: - auto local_pos = pattern.local(static_cast( - cur_in_first.pos())); - // Number of elements located at current source unit: - size_type max_elem_per_unit = pattern.local_size(local_pos.unit); - // Local offset of first element in input range at current unit: - auto l_in_first_idx = local_pos.index; - // Maximum number of elements to copy from current unit: - auto num_unit_elem = max_elem_per_unit - l_in_first_idx; - // Number of elements left to copy: - auto total_elem_left = num_elem_total - num_elem_copied; - // Number of elements to copy in this iteration. - auto num_copy_elem = (num_unit_elem < max_copy_elem) - ? num_unit_elem - : max_copy_elem; - if (num_copy_elem > total_elem_left) { - num_copy_elem = total_elem_left; - } - DASH_ASSERT_GT(num_copy_elem, 0, - "Number of element to copy is 0"); - DASH_LOG_TRACE("dash::copy_async_impl", - "start g_idx:", cur_in_first.pos(), - "->", - "unit:", local_pos.unit, - "l_idx:", l_in_first_idx, - "->", - "unit elements:", num_unit_elem, - "max elem/unit:", max_elem_per_unit, - "copy max:", max_copy_elem, - "get elements:", num_copy_elem, - "total:", num_elem_total, - "copied:", num_elem_copied, - "left:", total_elem_left); - auto src_gptr = cur_in_first.dart_gptr(); - auto dest_ptr = out_first + num_elem_copied; -#ifdef DASH__ALGORITHM__COPY__USE_FLUSH - dash::internal::get( - src_gptr, - dest_ptr, - num_copy_elem); - req_handles.push_back(src_gptr); -#else - dart_handle_t get_handle; - dash::internal::get_handle( - src_gptr, - dest_ptr, - num_copy_elem, - &get_handle); - if (get_handle != DART_HANDLE_NULL) { - req_handles.push_back(get_handle); - } -#endif - num_elem_copied += num_copy_elem; - } - } -#ifdef DASH_ENABLE_TRACE_LOGGING - for (auto gptr : req_handles) { - DASH_LOG_TRACE("dash::copy_async_impl", " req_handle:", gptr); - } -#endif - dash::Future result([=]() mutable { - // Wait for all get requests to complete: - ValueType * _out = out_first + num_elem_copied; - DASH_LOG_TRACE("dash::copy_async_impl [Future]()", - " wait for", req_handles.size(), "async get request"); - DASH_LOG_TRACE("dash::copy_async_impl [Future]", " flush:", req_handles); - DASH_LOG_TRACE("dash::copy_async_impl [Future]", " _out:", _out); -#ifdef DASH_ENABLE_TRACE_LOGGING - for (auto gptr : req_handles) { - DASH_LOG_TRACE("dash::copy_async_impl [Future]", " req_handle:", - gptr); - } -#endif -#ifdef DASH__ALGORITHM__COPY__USE_FLUSH - for (auto gptr : req_handles) { - dart_flush_local_all(gptr); - } -#else - if (req_handles.size() > 0) { - if (dart_waitall_local(&req_handles[0], req_handles.size()) - != DART_OK) { - DASH_LOG_ERROR("dash::copy_async_impl [Future]", - " dart_waitall_local failed"); - DASH_THROW( - dash::exception::RuntimeError, - "dash::copy_async_impl [Future]: dart_waitall_local failed"); - } - } else { - DASH_LOG_TRACE("dash::copy_async_impl [Future]", " No pending handles"); - } -#endif - DASH_LOG_TRACE("dash::copy_async_impl [Future] >", - " async requests completed, _out:", _out); - return _out; - }); - DASH_LOG_TRACE("dash::copy_async_impl >", " returning future"); - return result; -} - // ========================================================================= // Local to Global // ========================================================================= @@ -451,9 +224,10 @@ template < typename ValueType, class GlobOutputIt > GlobOutputIt copy_impl( - ValueType * in_first, - ValueType * in_last, - GlobOutputIt out_first) + ValueType * in_first, + ValueType * in_last, + GlobOutputIt out_first, + std::vector & handles) { DASH_LOG_TRACE("dash::copy_impl()", "l_in_first:", in_first, @@ -461,10 +235,15 @@ GlobOutputIt copy_impl( "g_out_first:", out_first.pos()); auto num_elements = std::distance(in_first, in_last); - dash::internal::put_blocking( + dart_handle_t handle; + dash::internal::put_handle( out_first.dart_gptr(), in_first, - num_elements); + num_elements, + &handle); + if (handle != DART_HANDLE_NULL) { + handles.push_back(handle); + } auto out_last = out_first + num_elements; DASH_LOG_TRACE("dash::copy_impl >", @@ -473,95 +252,6 @@ GlobOutputIt copy_impl( return out_last; } -/** - * Asynchronous implementation of \c dash::copy (local to global) without - * optimization for local subrange. - */ -template < - typename ValueType, - class GlobOutputIt > -dash::Future copy_async_impl( - ValueType * in_first, - ValueType * in_last, - GlobOutputIt out_first) -{ - DASH_LOG_TRACE("dash::copy_async_impl()", - "l_in_first:", in_first, - "l_in_last:", in_last, - "g_out_first:", out_first.dart_gptr()); - - // Accessed global pointers to be flushed: -#ifdef DASH__ALGORITHM__COPY__USE_FLUSH - std::vector req_handles; -#else - std::vector req_handles; -#endif - - auto num_copy_elem = std::distance(in_first, in_last); - auto src_ptr = in_first; - auto dest_gptr = out_first.dart_gptr(); -#ifdef DASH__ALGORITHM__COPY__USE_FLUSH - dash::internal::put( - dest_gptr, - src_ptr, - num_copy_elem); - req_handles.push_back(dest_gptr); -#else - dart_handle_t put_handle; - dash::internal::put_handle( - dest_gptr, - src_ptr, - num_copy_elem - &put_handle); - if (put_handle != DART_HANDLE_NULL) { - req_handles.push_back(put_handle); - } -#endif - -#ifdef DASH_ENABLE_TRACE_LOGGING - for (auto gptr : req_handles) { - DASH_LOG_TRACE("dash::copy_async_impl", " req_handle:", gptr); - } -#endif - dash::Future result([=]() mutable { - // Wait for all get requests to complete: - GlobOutputIt _out = out_first + num_copy_elem; - DASH_LOG_TRACE("dash::copy_async_impl [Future]()", - " wait for", req_handles.size(), "async put request"); - DASH_LOG_TRACE("dash::copy_async_impl [Future]", " flush:", req_handles); - DASH_LOG_TRACE("dash::copy_async_impl [Future]", " _out:", _out); -#ifdef DASH_ENABLE_TRACE_LOGGING - for (auto gptr : req_handles) { - DASH_LOG_TRACE("dash::copy_async_impl [Future]", " req_handle:", - gptr); - } -#endif -#ifdef DASH__ALGORITHM__COPY__USE_FLUSH - for (auto gptr : req_handles) { - dart_flush_all(gptr); - } -#else - if (req_handles.size() > 0) { - if (dart_waitall(&req_handles[0], req_handles.size()) - != DART_OK) { - DASH_LOG_ERROR("dash::copy_async_impl [Future]", - " dart_waitall failed"); - DASH_THROW( - dash::exception::RuntimeError, - "dash::copy_async_impl [Future]: dart_waitall failed"); - } - } else { - DASH_LOG_TRACE("dash::copy_async_impl [Future]", " No pending handles"); - } -#endif - DASH_LOG_TRACE("dash::copy_async_impl [Future] >", - " async requests completed, _out:", _out); - return _out; - }); - DASH_LOG_TRACE("dash::copy_async_impl >", " returning future"); - return result; -} - } // namespace internal @@ -585,17 +275,19 @@ dash::Future copy_async( ValueType * out_first) { const auto & team = in_first.team(); + + DASH_LOG_TRACE("dash::copy_async()", "async, global to local"); + if (in_first == in_last) { + DASH_LOG_TRACE("dash::copy_async", "input range empty"); + return dash::Future(out_first); + } + dash::util::UnitLocality uloc(team, team.myid()); // Size of L2 data cache line: int l2_line_size = uloc.hwinfo().cache_line_sizes[1]; bool use_memcpy = ((in_last - in_first) * sizeof(ValueType)) <= l2_line_size; - DASH_LOG_TRACE("dash::copy_async()", "async, global to local"); - if (in_first == in_last) { - DASH_LOG_TRACE("dash::copy_async", "input range empty"); - return dash::Future([=]() { return out_first; }); - } ValueType * dest_first = out_first; // Return value, initialize with begin of output range, indicating no values // have been copied: @@ -635,15 +327,15 @@ dash::Future copy_async( } DASH_LOG_TRACE("dash::copy_async", "finished local copy of", (out_last - out_first), "elements"); - return dash::Future([=]() { return out_last; }); + return dash::Future(out_last); } + auto handles = std::make_shared>(); + DASH_LOG_TRACE("dash::copy_async", "local range:", li_range_in.begin, li_range_in.end, "in_first.is_local:", in_first.is_local()); - // Futures of asynchronous get requests: - auto futures = std::vector< dash::Future >(); // Check if global input range is partially local: if (num_local_elem > 0) { // Part of the input range is local, copy local input subrange to local @@ -685,10 +377,10 @@ dash::Future copy_async( // ... [ --- copy --- | ... l ... | ........ ] // ^ ^ ^ ^ // in_first l_in_first l_in_last in_last - auto fut_prelocal = dash::internal::copy_async_impl(g_in_first, - g_l_in_first, - dest_first); - futures.push_back(fut_prelocal); + dash::internal::copy_impl(g_in_first, + g_l_in_first, + dest_first, + *handles); // Advance output pointers: out_last += num_prelocal_elem; dest_first = out_last; @@ -708,10 +400,10 @@ dash::Future copy_async( // ... [ ........ | ... l ... | --- copy --- ] // ^ ^ ^ ^ // in_first l_in_first l_in_last in_last - auto fut_postlocal = dash::internal::copy_async_impl(g_l_in_last, - g_in_last, - dest_first); - futures.push_back(fut_postlocal); + dash::internal::copy_impl(g_l_in_last, + g_in_last, + dest_first, + *handles); out_last += num_postlocal_elem; } // @@ -757,26 +449,64 @@ dash::Future copy_async( } else { DASH_LOG_TRACE("dash::copy_async", "no local subrange"); // All elements in input range are remote - auto fut_all = dash::internal::copy_async_impl(in_first, - in_last, - dest_first); - futures.push_back(fut_all); + dash::internal::copy_impl(in_first, + in_last, + dest_first, + *handles); out_last = out_first + total_copy_elem; } DASH_LOG_TRACE("dash::copy_async", "preparing future"); - dash::Future fut_result([=]() mutable { - ValueType * _out = out_last; - DASH_LOG_TRACE("dash::copy_async [Future]()", - "wait for", futures.size(), "async copy requests"); - DASH_LOG_TRACE("dash::copy_async [Future]", " futures:", futures); - DASH_LOG_TRACE("dash::copy_async [Future]", " _out:", _out); - for (auto f : futures) { - f.wait(); + if (handles->size() == 0) { + DASH_LOG_TRACE("dash::copy_async >", "finished (no pending handles), ", + "out_last:", out_last); + return dash::Future(out_last); + } + dash::Future fut_result( + // wait + [=]() mutable { + // Wait for all get requests to complete: + ValueType * _out = out_last; + DASH_LOG_TRACE("dash::copy_async_impl [Future]()", + " wait for", handles->size(), "async get request"); + DASH_LOG_TRACE("dash::copy_async_impl [Future]", " _out:", _out); + if (handles->size() > 0) { + if (dart_waitall_local(handles->data(), handles->size()) + != DART_OK) { + DASH_LOG_ERROR("dash::copy_async_impl [Future]", + " dart_waitall_local failed"); + DASH_THROW( + dash::exception::RuntimeError, + "dash::copy_async_impl [Future]: dart_waitall_local failed"); + } + } else { + DASH_LOG_TRACE("dash::copy_async_impl [Future]", " No pending handles"); + } + DASH_LOG_TRACE("dash::copy_async_impl [Future] >", + " async requests completed, _out:", _out); + return _out; + }, + // test + [=](ValueType ** out) mutable { + int32_t flag; + DASH_ASSERT_RETURNS( + DART_OK, + dart_testall_local(handles->data(), handles->size(), &flag)); + if (flag) { + handles->clear(); + *out = out_last; + } + return (flag != 0); + }, + // destroy + [=]() mutable { + for (auto& handle : *handles) { + DASH_ASSERT_RETURNS( + DART_OK, + dart_handle_free(&handle)); + } } - DASH_LOG_TRACE("dash::copy_async [Future] >", "async requests completed", - "futures:", futures, "_out:", _out); - return _out; - }); + ); + DASH_LOG_TRACE("dash::copy_async >", "finished,", "expected out_last:", out_last); return fut_result; @@ -843,6 +573,8 @@ ValueType * copy( return out_last; } + std::vector handles; + DASH_LOG_TRACE("dash::copy", "local range:", li_range_in.begin, li_range_in.end, @@ -895,7 +627,8 @@ ValueType * copy( // in_first l_in_first l_in_last in_last out_last = dash::internal::copy_impl(g_in_first, g_l_in_first, - dest_first); + dest_first, + handles); // Advance output pointers: dest_first = out_last; } @@ -953,15 +686,24 @@ ValueType * copy( // in_first l_in_first l_in_last in_last out_last = dash::internal::copy_impl(g_l_in_last, g_in_last, - dest_first); + dest_first, + handles); } } else { DASH_LOG_TRACE("dash::copy", "no local subrange"); // All elements in input range are remote out_last = dash::internal::copy_impl(in_first, in_last, - dest_first); + dest_first, + handles); } + + if (handles.size() > 0) { + DASH_LOG_TRACE("dash::copy", "Waiting for remote transfers to complete,", + "num_handles: ", handles.size()); + dart_waitall_local(handles.data(), handles.size()); + } + DASH_LOG_TRACE("dash::copy >", "finished,", "out_last:", out_last); return out_last; @@ -985,10 +727,62 @@ dash::Future copy_async( ValueType * in_last, GlobOutputIt out_first) { - auto fut = dash::internal::copy_async_impl(in_first, - in_last, - out_first); - return fut; + auto handles = std::make_shared>(); + auto out_last = dash::internal::copy_impl(in_first, + in_last, + out_first, + *handles); + + if (handles->size() == 0) { + return dash::Future(out_last); + } + dash::Future fut_result( + // get + [=]() mutable { + // Wait for all get requests to complete: + GlobOutputIt _out = out_last; + DASH_LOG_TRACE("dash::copy_async [Future]()", + " wait for", handles->size(), "async put request"); + DASH_LOG_TRACE("dash::copy_async [Future]", " _out:", _out); + if (handles->size() > 0) { + if (dart_waitall(handles->data(), handles->size()) + != DART_OK) { + DASH_LOG_ERROR("dash::copy_async [Future]", + " dart_waitall failed"); + DASH_THROW( + dash::exception::RuntimeError, + "dash::copy_async [Future]: dart_waitall failed"); + } + } else { + DASH_LOG_TRACE("dash::copy_async [Future]", " No pending handles"); + } + handles->clear(); + DASH_LOG_TRACE("dash::copy_async [Future] >", + " async requests completed, _out:", _out); + return _out; + }, + // test + [=](GlobOutputIt *out) mutable { + int32_t flag; + DASH_ASSERT_RETURNS( + DART_OK, + dart_testall(handles->data(), handles->size(), &flag)); + if (flag) { + handles->clear(); + *out = out_last; + } + return (flag != 0); + }, + // destroy + [=]() mutable { + for (auto& handle : *handles) { + DASH_ASSERT_RETURNS( + DART_OK, + dart_handle_free(&handle)); + } + } + ); + return fut_result; } /** @@ -1021,6 +815,8 @@ GlobOutputIt copy( DASH_LOG_TRACE_VAR("dash::copy", li_range_out.end); // Number of elements in the local subrange: auto num_local_elem = li_range_out.end - li_range_out.begin; + // handles to wait on at the end + std::vector handles; // Check if part of the output range is local: if (num_local_elem > 0) { // Part of the output range is local @@ -1060,7 +856,8 @@ GlobOutputIt copy( out_last = dash::internal::copy_impl( in_first, in_first + l_elem_offset, - out_first); + out_first, + handles); } // Copy to remote elements succeeding the local subrange: if (g_l_offset_end < out_h_last.pos()) { @@ -1068,7 +865,8 @@ GlobOutputIt copy( out_last = dash::internal::copy_impl( in_first + l_elem_offset + num_local_elem, in_last, - out_first + num_local_elem); + out_first + num_local_elem, + handles); } } else { // All elements in output range are remote @@ -1076,8 +874,16 @@ GlobOutputIt copy( out_last = dash::internal::copy_impl( in_first, in_last, - out_first); + out_first, + handles); } + + if (handles.size() > 0) { + DASH_LOG_TRACE("dash::copy", "Waiting for remote transfers to complete,", + "num_handles: ", handles.size()); + dart_waitall_local(handles.data(), handles.size()); + } + return out_last; } diff --git a/dash/test/algorithm/CopyTest.cc b/dash/test/algorithm/CopyTest.cc index c27b60f70..8effa9ea4 100644 --- a/dash/test/algorithm/CopyTest.cc +++ b/dash/test/algorithm/CopyTest.cc @@ -414,7 +414,7 @@ TEST_F(CopyTest, BlockingLocalToGlobalBlock) array.barrier(); } -TEST_F(CopyTest, AsyncLocalToGlobPtr) +TEST_F(CopyTest, AsyncLocalToGlobPtrWait) { // Copy all elements contained in a single, continuous block. const int num_elem_per_unit = 5; @@ -443,14 +443,14 @@ TEST_F(CopyTest, AsyncLocalToGlobPtr) glob_ptr_t gptr_dest = static_cast( array.begin() + global_offset); - LOG_MESSAGE("CopyTest.AsyncLocalToGlobPtr: call copy_async"); + LOG_MESSAGE("CopyTest.AsyncLocalToGlobPtrWait: call copy_async"); auto copy_fut = dash::copy_async(local_range, local_range + num_elem_per_unit, gptr_dest); // Blocks until remote completion: - LOG_MESSAGE("CopyTest.AsyncLocalToGlobPtr: call fut.wait"); + LOG_MESSAGE("CopyTest.AsyncLocalToGlobPtrWait: call fut.wait"); copy_fut.wait(); array.barrier(); @@ -463,6 +463,56 @@ TEST_F(CopyTest, AsyncLocalToGlobPtr) array.barrier(); } + +TEST_F(CopyTest, AsyncLocalToGlobPtrTest) +{ + // Copy all elements contained in a single, continuous block. + const int num_elem_per_unit = 5; + size_t num_elem_total = _dash_size * num_elem_per_unit; + + // Global target range: + dash::Array array(num_elem_total, dash::BLOCKED); + // Local range to copy: + int local_range[num_elem_per_unit]; + + // Assign initial values: [ 1000, 1001, 1002, ... 2000, 2001, ... ] + for (auto l = 0; l < num_elem_per_unit; ++l) { + array.local[l] = ((dash::myid() + 1) * 110000) + l; + local_range[l] = ((dash::myid() + 1) * 1000) + l; + } + array.barrier(); + + // Copy values from local range to remote global range. + // All units (u) copy into block (nblocks-1-u), so unit 0 copies into + // last block. + auto block_offset = (dash::myid() + 1) % dash::size(); + auto global_offset = block_offset * num_elem_per_unit; + + using glob_it_t = decltype(array.begin()); + using glob_ptr_t = typename glob_it_t::pointer; + + glob_ptr_t gptr_dest = static_cast( + array.begin() + global_offset); + LOG_MESSAGE("CopyTest.AsyncLocalToGlobPtrTest: call copy_async"); + + auto copy_fut = dash::copy_async(local_range, + local_range + num_elem_per_unit, + gptr_dest); + + // Blocks until remote completion: + LOG_MESSAGE("CopyTest.AsyncLocalToGlobPtrTest: call fut.test"); + while (!copy_fut.test()) {} + + array.barrier(); + + for (auto l = 0; l < num_elem_per_unit; ++l) { + // Compare local buffer and global array dest range: + EXPECT_EQ_U(local_range[l], + static_cast(array[global_offset + l])); + } + array.barrier(); +} + TEST_F(CopyTest, BlockingGlobalToLocalSubBlock) { // Copy all elements contained in a single, continuous block, @@ -729,7 +779,7 @@ TEST_F(CopyTest, AsyncGlobalToLocalTiles) auto req = dash::copy_async(gblock_a.begin(), gblock_a.end(), matrix_b_dest); - req_handles.push_back(req); + req_handles.push_back(std::move(req)); dst_pointers.push_back(matrix_b_dest); } @@ -743,7 +793,7 @@ TEST_F(CopyTest, AsyncGlobalToLocalTiles) // To prevent compiler from removing work load loop in optimization: LOG_MESSAGE("Dummy result: %f", m); - for (auto req : req_handles) { + for (auto& req : req_handles) { // Wait for completion of async copy operation. // Returns pointer to final element copied into target range: value_t * copy_dest_end = req.get(); @@ -769,7 +819,7 @@ TEST_F(CopyTest, AsyncGlobalToLocalTiles) } } -TEST_F(CopyTest, AsyncGlobalToLocalBlock) +TEST_F(CopyTest, AsyncGlobalToLocalBlockWait) { // Copy all elements contained in a single, continuous block. const int num_elem_per_unit = 20; @@ -803,6 +853,42 @@ TEST_F(CopyTest, AsyncGlobalToLocalBlock) } } +TEST_F(CopyTest, AsyncGlobalToLocalTest) +{ + // Copy all elements contained in a single, continuous block. + const int num_elem_per_unit = 20; + size_t num_elem_total = _dash_size * num_elem_per_unit; + + dash::Array array(num_elem_total, dash::BLOCKED); + + EXPECT_EQ_U(num_elem_per_unit, array.local.size()); + EXPECT_EQ_U(num_elem_per_unit, array.lsize()); + + // Assign initial values: [ 1000, 1001, 1002, ... 2000, 2001, ... ] + for (auto l = 0; l < num_elem_per_unit; ++l) { + array.local[l] = ((dash::myid() + 1) * 1000) + l; + } + array.barrier(); + + // Local range to store copy: + int local_copy[num_elem_per_unit]; + + // Copy values from global range to local memory. + // All units copy first block, so unit 0 tests local-to-local copying. + auto dest_end = dash::copy_async(array.begin(), + array.begin() + num_elem_per_unit, + local_copy); + + // spin until the transfer is completed + while (!dest_end.test()) { } + + EXPECT_EQ_U(num_elem_per_unit, dest_end.get() - local_copy); + for (auto l = 0; l < num_elem_per_unit; ++l) { + EXPECT_EQ_U(static_cast(array[l]), + local_copy[l]); + } +} + #if 0 // TODO TEST_F(CopyTest, AsyncAllToLocalVector)