From 121ef4bcfca12c1fa964632484410fd5da4d1309 Mon Sep 17 00:00:00 2001 From: Joseph Schuchart Date: Wed, 16 Aug 2017 15:02:56 +0200 Subject: [PATCH 01/11] Add dart_handle_free and beef up dash::Future --- .../include/dash/dart/if/dart_communication.h | 12 ++ dart-impl/mpi/src/dart_communication.c | 10 + dash/include/dash/Future.h | 71 +++++-- dash/include/dash/algorithm/Copy.h | 187 +++++++++++------- 4 files changed, 188 insertions(+), 92 deletions(-) diff --git a/dart-if/include/dash/dart/if/dart_communication.h b/dart-if/include/dash/dart/if/dart_communication.h index 26004704b..77e064222 100644 --- a/dart-if/include/dash/dart/if/dart_communication.h +++ b/dart-if/include/dash/dart/if/dart_communication.h @@ -589,6 +589,18 @@ dart_ret_t dart_testall_local( size_t n, int32_t * result) DART_NOTHROW; + +/** + * Free the handle without testing or waiting for completion of the operation. + * + * \param handle Pointer to the handle to free. + * + * \return \c DART_OK on success, any other of \ref dart_ret_t otherwise. + * + */ +dart_ret_t dart_handle_free( + dart_handle_t * handle) DART_NOTHROW; + /** \} */ /** diff --git a/dart-impl/mpi/src/dart_communication.c b/dart-impl/mpi/src/dart_communication.c index c2525fd5f..d5a944f1a 100644 --- a/dart-impl/mpi/src/dart_communication.c +++ b/dart-impl/mpi/src/dart_communication.c @@ -1538,6 +1538,16 @@ dart_ret_t dart_testall_local( return DART_OK; } +dart_ret_t dart_handle_free( + dart_handle_t * handleptr) +{ + if (handleptr != NULL && *handleptr != DART_HANDLE_NULL) { + free(*handleptr); + *handleptr = DART_HANDLE_NULL; + } + return DART_OK; +} + /* -- Dart collective operations -- */ static int _dart_barrier_count = 0; diff --git a/dash/include/dash/Future.h b/dash/include/dash/Future.h index ab796ba25..8c0def7d9 100644 --- a/dash/include/dash/Future.h +++ b/dash/include/dash/Future.h @@ -16,14 +16,17 @@ template class Future { private: - typedef Future self_t; - typedef std::function func_t; + typedef Future self_t; + typedef std::function get_func_t; + typedef std::function test_func_t; + typedef std::function destroy_func_t; private: - func_t _func; - ResultT _value; - bool _ready = false; - bool _has_func = false; + get_func_t _get_func; + test_func_t _test_func; + destroy_func_t _destroy_func; + ResultT _value; + bool _ready = false; public: // For ostream output @@ -34,31 +37,53 @@ class Future public: Future() - : _ready(false), - _has_func(false) + : _ready(false) { } - Future(const func_t & func) - : _func(func), - _ready(false), - _has_func(true) + Future(ResultT & result) + : _value(result), + _ready(true) + { } + + Future(const get_func_t & func) + : _get_func(func), + _ready(false) + { } + + + Future( + const get_func_t & get_func, + const test_func_t & test_func, + const destroy_func_t & destroy_func) + : _get_func(get_func), + _test_func(test_func), + _destroy_func(destroy_func), + _ready(false) { } Future( const self_t & other) - : _func(other._func), + : _get_func(other._get_func), + _test_func(other._test_func), + _destroy_func(other._destroy_func), _value(other._value), - _ready(other._ready), - _has_func(other._has_func) + _ready(other._ready) { } + ~Future() { + if (_destroy_func) { + _destroy_func(); + } + } + Future & operator=(const self_t & other) { if (this != &other) { - _func = other._func; - _value = other._value; - _ready = other._ready; - _has_func = other._has_func; + _get_func = other._get_func; + _test_func = other._test_func; + _destroy_func = other._destroy_func; + _value = other._value; + _ready = other._ready; } return *this; } @@ -69,19 +94,23 @@ class Future if (_ready) { return; } - if (!_has_func) { + if (!_get_func) { DASH_LOG_ERROR("Future.wait()", "No function"); DASH_THROW( dash::exception::RuntimeError, "Future not initialized with function"); } - _value = _func(); + _value = _get_func(); _ready = true; DASH_LOG_TRACE_VAR("Future.wait >", _ready); } bool test() const { + if (!_ready && _test_func) { + // do not set _ready here because we might have to call _get_func() above + return _test_func(); + } return _ready; } diff --git a/dash/include/dash/algorithm/Copy.h b/dash/include/dash/algorithm/Copy.h index f89716e22..e1b595ed8 100644 --- a/dash/include/dash/algorithm/Copy.h +++ b/dash/include/dash/algorithm/Copy.h @@ -15,8 +15,8 @@ // #ifndef DASH__ALGORITHM__COPY__USE_WAIT -#define DASH__ALGORITHM__COPY__USE_FLUSH -// #define DASH__ALGORITHM__COPY__USE_WAIT +//#define DASH__ALGORITHM__COPY__USE_FLUSH + #define DASH__ALGORITHM__COPY__USE_WAIT // #endif namespace dash { @@ -270,7 +270,7 @@ dash::Future copy_async_impl( size_type num_elem_total = dash::distance(in_first, in_last); if (num_elem_total <= 0) { DASH_LOG_TRACE("dash::copy_async_impl", "input range empty"); - return dash::Future([=]() { return out_first; }); + return dash::Future(out_first); } DASH_LOG_TRACE("dash::copy_async_impl", "total elements:", num_elem_total, @@ -291,10 +291,12 @@ dash::Future copy_async_impl( // Accessed global pointers to be flushed: #ifdef DASH__ALGORITHM__COPY__USE_FLUSH - std::vector req_handles; + using req_handles_t = ::std::vector; #else - std::vector req_handles; + using req_handles_t = ::std::vector; #endif + using shared_ptr_t = std::shared_ptr; + shared_ptr_t req_handles = std::make_shared(); // MPI uses offset type int, do not copy more than INT_MAX bytes: size_type max_copy_elem = (std::numeric_limits::max() / @@ -335,7 +337,7 @@ dash::Future copy_async_impl( ds.nelem, ds.dtype), DART_OK); - req_handles.push_back(in_first.dart_gptr()); + req_handles->push_back(in_first.dart_gptr()); #else dart_handle_t get_handle; dash::dart_storage ds(num_copy_elem); @@ -348,7 +350,7 @@ dash::Future copy_async_impl( &get_handle), DART_OK); if (get_handle != NULL) { - req_handles.push_back(get_handle); + req_handles->push_back(get_handle); } #endif num_elem_copied += num_copy_elem; @@ -409,7 +411,7 @@ dash::Future copy_async_impl( DASH_THROW( dash::exception::RuntimeError, "dart_get failed"); } - req_handles.push_back(src_gptr); + req_handles->push_back(src_gptr); #else dart_handle_t get_handle; dash::dart_storage ds(num_copy_elem); @@ -422,7 +424,7 @@ dash::Future copy_async_impl( &get_handle), DART_OK); if (get_handle != NULL) { - req_handles.push_back(get_handle); + req_handles->push_back(get_handle); } #endif num_elem_copied += num_copy_elem; @@ -433,41 +435,62 @@ dash::Future copy_async_impl( DASH_LOG_TRACE("dash::copy_async_impl", " req_handle:", gptr); } #endif - dash::Future result([=]() mutable { - // Wait for all get requests to complete: - ValueType * _out = out_first + num_elem_copied; - DASH_LOG_TRACE("dash::copy_async_impl [Future]()", - " wait for", req_handles.size(), "async get request"); - DASH_LOG_TRACE("dash::copy_async_impl [Future]", " flush:", req_handles); - DASH_LOG_TRACE("dash::copy_async_impl [Future]", " _out:", _out); -#ifdef DASH_ENABLE_TRACE_LOGGING - for (auto gptr : req_handles) { - DASH_LOG_TRACE("dash::copy_async_impl [Future]", " req_handle:", - gptr); - } -#endif -#ifdef DASH__ALGORITHM__COPY__USE_FLUSH - for (auto gptr : req_handles) { - dart_flush_local_all(gptr); + dash::Future result( + [=]() mutable { + // Wait for all get requests to complete: + ValueType * _out = out_first + num_elem_copied; + DASH_LOG_TRACE("dash::copy_async_impl [Future]()", + " wait for", req_handles->size(), "async get request"); + DASH_LOG_TRACE("dash::copy_async_impl [Future]", " flush:", req_handles); + DASH_LOG_TRACE("dash::copy_async_impl [Future]", " _out:", _out); + #ifdef DASH_ENABLE_TRACE_LOGGING + for (auto gptr : *req_handles) { + DASH_LOG_TRACE("dash::copy_async_impl [Future]", " req_handle:", + gptr); + } + #endif + #ifdef DASH__ALGORITHM__COPY__USE_FLUSH + for (auto gptr : *req_handles) { + dart_flush_local_all(gptr); + } + #else + if (req_handles->size() > 0) { + if (dart_waitall_local(req_handles->data(), req_handles->size()) + != DART_OK) { + DASH_LOG_ERROR("dash::copy_async_impl [Future]", + " dart_waitall_local failed"); + DASH_THROW( + dash::exception::RuntimeError, + "dash::copy_async_impl [Future]: dart_waitall_local failed"); + } + } else { + DASH_LOG_TRACE("dash::copy_async_impl [Future]", " No pending handles"); + } + #endif + DASH_LOG_TRACE("dash::copy_async_impl [Future] >", + " async requests completed, _out:", _out); + return _out; } -#else - if (req_handles.size() > 0) { - if (dart_waitall_local(&req_handles[0], req_handles.size()) - != DART_OK) { - DASH_LOG_ERROR("dash::copy_async_impl [Future]", - " dart_waitall_local failed"); - DASH_THROW( - dash::exception::RuntimeError, - "dash::copy_async_impl [Future]: dart_waitall_local failed"); +#ifndef DASH__ALGORITHM__COPY__USE_FLUSH + , + // test + [=]() mutable { + int32_t flag; + dart_testall_local(req_handles->data(), req_handles->size(), &flag); + if (flag) { + req_handles->clear(); + } + return (flag != 0); + }, + // destroy + [=]() mutable { + for (auto& handle : *req_handles) { + dart_handle_free(&handle); } - } else { - DASH_LOG_TRACE("dash::copy_async_impl [Future]", " No pending handles"); } + #endif - DASH_LOG_TRACE("dash::copy_async_impl [Future] >", - " async requests completed, _out:", _out); - return _out; - }); + ); DASH_LOG_TRACE("dash::copy_async_impl >", " returning future"); return result; } @@ -529,10 +552,12 @@ dash::Future copy_async_impl( // Accessed global pointers to be flushed: #ifdef DASH__ALGORITHM__COPY__USE_FLUSH - std::vector req_handles; + using req_handles_t = ::std::vector; #else - std::vector req_handles; + using req_handles_t = ::std::vector; #endif + using shared_ptr_t = std::shared_ptr; + shared_ptr_t req_handles = std::make_shared(); auto num_copy_elem = std::distance(in_first, in_last); auto src_ptr = in_first; @@ -562,7 +587,7 @@ dash::Future copy_async_impl( &put_handle), DART_OK); if (put_handle != NULL) { - req_handles.push_back(put_handle); + req_handles->push_back(put_handle); } #endif @@ -571,41 +596,61 @@ dash::Future copy_async_impl( DASH_LOG_TRACE("dash::copy_async_impl", " req_handle:", gptr); } #endif - dash::Future result([=]() mutable { - // Wait for all get requests to complete: - GlobOutputIt _out = out_first + num_copy_elem; - DASH_LOG_TRACE("dash::copy_async_impl [Future]()", - " wait for", req_handles.size(), "async put request"); - DASH_LOG_TRACE("dash::copy_async_impl [Future]", " flush:", req_handles); - DASH_LOG_TRACE("dash::copy_async_impl [Future]", " _out:", _out); + dash::Future result( + [=]() mutable { + // Wait for all get requests to complete: + GlobOutputIt _out = out_first + num_copy_elem; + DASH_LOG_TRACE("dash::copy_async_impl [Future]()", + " wait for", req_handles.size(), "async put request"); + DASH_LOG_TRACE("dash::copy_async_impl [Future]", " flush:", req_handles); + DASH_LOG_TRACE("dash::copy_async_impl [Future]", " _out:", _out); #ifdef DASH_ENABLE_TRACE_LOGGING - for (auto gptr : req_handles) { - DASH_LOG_TRACE("dash::copy_async_impl [Future]", " req_handle:", - gptr); - } + for (auto gptr : req_handles) { + DASH_LOG_TRACE("dash::copy_async_impl [Future]", " req_handle:", + gptr); + } #endif #ifdef DASH__ALGORITHM__COPY__USE_FLUSH - for (auto gptr : req_handles) { - dart_flush_all(gptr); - } + for (auto gptr : *req_handles) { + dart_flush_all(gptr); + } #else - if (req_handles.size() > 0) { - if (dart_waitall(&req_handles[0], req_handles.size()) - != DART_OK) { - DASH_LOG_ERROR("dash::copy_async_impl [Future]", - " dart_waitall failed"); - DASH_THROW( - dash::exception::RuntimeError, - "dash::copy_async_impl [Future]: dart_waitall failed"); + if (req_handles->size() > 0) { + if (dart_waitall(req_handles->data(), req_handles->size()) + != DART_OK) { + DASH_LOG_ERROR("dash::copy_async_impl [Future]", + " dart_waitall failed"); + DASH_THROW( + dash::exception::RuntimeError, + "dash::copy_async_impl [Future]: dart_waitall failed"); + } + } else { + DASH_LOG_TRACE("dash::copy_async_impl [Future]", " No pending handles"); + } +#endif + DASH_LOG_TRACE("dash::copy_async_impl [Future] >", + " async requests completed, _out:", _out); + return _out; + } +#ifndef DASH__ALGORITHM__COPY__USE_FLUSH + , + // test + [=]() mutable { + int32_t flag; + dart_testall_local(req_handles->data(), req_handles->size(), &flag); + if (flag) { + req_handles->clear(); + } + return (flag != 0); + }, + // destroy + [=]() mutable { + for (auto& handle : *req_handles) { + dart_handle_free(&handle); } - } else { - DASH_LOG_TRACE("dash::copy_async_impl [Future]", " No pending handles"); } #endif - DASH_LOG_TRACE("dash::copy_async_impl [Future] >", - " async requests completed, _out:", _out); - return _out; - }); + ); DASH_LOG_TRACE("dash::copy_async_impl >", " returning future"); return result; } From 13c29f713e66bacdb34edeaab4df8d68a48c7bad Mon Sep 17 00:00:00 2001 From: Joseph Schuchart Date: Tue, 17 Oct 2017 22:19:16 +0200 Subject: [PATCH 02/11] Future: Fix semantics of _test() function --- dash/include/dash/Future.h | 6 +++--- dash/include/dash/algorithm/Copy.h | 23 ++++++++++++++--------- 2 files changed, 17 insertions(+), 12 deletions(-) diff --git a/dash/include/dash/Future.h b/dash/include/dash/Future.h index 8c0def7d9..b392abdd6 100644 --- a/dash/include/dash/Future.h +++ b/dash/include/dash/Future.h @@ -18,7 +18,7 @@ class Future private: typedef Future self_t; typedef std::function get_func_t; - typedef std::function test_func_t; + typedef std::function test_func_t; typedef std::function destroy_func_t; private: @@ -36,6 +36,7 @@ class Future const Future & future); public: + Future() : _ready(false) { } @@ -108,8 +109,7 @@ class Future bool test() const { if (!_ready && _test_func) { - // do not set _ready here because we might have to call _get_func() above - return _test_func(); + _ready = _test_func(&_value); } return _ready; } diff --git a/dash/include/dash/algorithm/Copy.h b/dash/include/dash/algorithm/Copy.h index e1b595ed8..7a023e945 100644 --- a/dash/include/dash/algorithm/Copy.h +++ b/dash/include/dash/algorithm/Copy.h @@ -431,11 +431,12 @@ dash::Future copy_async_impl( } } #ifdef DASH_ENABLE_TRACE_LOGGING - for (auto gptr : req_handles) { + for (auto gptr : *req_handles) { DASH_LOG_TRACE("dash::copy_async_impl", " req_handle:", gptr); } #endif dash::Future result( + // get [=]() mutable { // Wait for all get requests to complete: ValueType * _out = out_first + num_elem_copied; @@ -474,11 +475,12 @@ dash::Future copy_async_impl( #ifndef DASH__ALGORITHM__COPY__USE_FLUSH , // test - [=]() mutable { + [=](ValueType ** out) mutable { int32_t flag; dart_testall_local(req_handles->data(), req_handles->size(), &flag); if (flag) { req_handles->clear(); + *out = out_first + num_elem_copied; } return (flag != 0); }, @@ -574,7 +576,7 @@ dash::Future copy_async_impl( DASH_THROW( dash::exception::RuntimeError, "dart_put failed"); } - req_handles.push_back(dest_gptr); + req_handles->push_back(dest_gptr); #else dart_handle_t put_handle; dash::dart_storage ds(num_copy_elem); @@ -592,20 +594,21 @@ dash::Future copy_async_impl( #endif #ifdef DASH_ENABLE_TRACE_LOGGING - for (auto gptr : req_handles) { + for (auto gptr : *req_handles) { DASH_LOG_TRACE("dash::copy_async_impl", " req_handle:", gptr); } #endif dash::Future result( + // get [=]() mutable { // Wait for all get requests to complete: GlobOutputIt _out = out_first + num_copy_elem; DASH_LOG_TRACE("dash::copy_async_impl [Future]()", - " wait for", req_handles.size(), "async put request"); - DASH_LOG_TRACE("dash::copy_async_impl [Future]", " flush:", req_handles); + " wait for", req_handles->size(), "async put request"); + DASH_LOG_TRACE("dash::copy_async_impl [Future]", " flush:", *req_handles); DASH_LOG_TRACE("dash::copy_async_impl [Future]", " _out:", _out); #ifdef DASH_ENABLE_TRACE_LOGGING - for (auto gptr : req_handles) { + for (auto gptr : *req_handles) { DASH_LOG_TRACE("dash::copy_async_impl [Future]", " req_handle:", gptr); } @@ -628,6 +631,7 @@ dash::Future copy_async_impl( DASH_LOG_TRACE("dash::copy_async_impl [Future]", " No pending handles"); } #endif + req_handles->clear(); DASH_LOG_TRACE("dash::copy_async_impl [Future] >", " async requests completed, _out:", _out); return _out; @@ -635,11 +639,12 @@ dash::Future copy_async_impl( #ifndef DASH__ALGORITHM__COPY__USE_FLUSH , // test - [=]() mutable { + [=](GlobOutputIt *out) mutable { int32_t flag; dart_testall_local(req_handles->data(), req_handles->size(), &flag); if (flag) { req_handles->clear(); + *out = out_first + num_copy_elem; } return (flag != 0); }, @@ -912,7 +917,7 @@ ValueType * copy( auto total_copy_elem = in_last - in_first; // Instead of testing in_first.local() and in_last.local(), this test for - // a local-only range only requires one call to in_first.local() which + // a local-only range only requires one call to in_first.local() which // increases throughput by ~10% for local ranges. if (num_local_elem == total_copy_elem) { // Entire input range is local: From e18e6fe2d4a78ce7ef66631dc7d2d1e93f71db29 Mon Sep 17 00:00:00 2001 From: Joseph Schuchart Date: Tue, 14 Nov 2017 13:11:58 +0900 Subject: [PATCH 03/11] Simplify dash::internal::copy_impl by letting DART handle large transfers --- dash/include/dash/algorithm/Copy.h | 54 ++++++++---------------------- 1 file changed, 14 insertions(+), 40 deletions(-) diff --git a/dash/include/dash/algorithm/Copy.h b/dash/include/dash/algorithm/Copy.h index 7a023e945..5e6e980b3 100644 --- a/dash/include/dash/algorithm/Copy.h +++ b/dash/include/dash/algorithm/Copy.h @@ -144,46 +144,23 @@ ValueType * copy_impl( auto unit_last = pattern.unit_at(g_in_last.pos() - 1); DASH_LOG_TRACE_VAR("dash::copy_impl", unit_last); - // MPI uses offset type int, do not copy more than INT_MAX bytes: - size_type max_copy_elem = (std::numeric_limits::max() / - sizeof(ValueType)); size_type num_elem_copied = 0; - DASH_LOG_TRACE_VAR("dash::copy_impl", max_copy_elem); - if (num_elem_total > max_copy_elem) { - DASH_LOG_DEBUG("dash::copy_impl", - "cannot copy", num_elem_total, "elements", - "in a single dart_get operation"); - } if (unit_first == unit_last) { // Input range is located at a single remote unit: DASH_LOG_TRACE("dash::copy_impl", "input range at single unit"); - while (num_elem_copied < num_elem_total) { - // Number of elements left to copy: - auto total_elem_left = num_elem_total - num_elem_copied; - auto num_copy_elem = (num_elem_total > max_copy_elem) - ? max_copy_elem - : num_elem_total; - if (num_copy_elem > total_elem_left) { - num_copy_elem = total_elem_left; - } - DASH_LOG_TRACE("dash::copy_impl", - "copy max:", max_copy_elem, - "get elements:", num_copy_elem, - "total:", num_elem_total, - "copied:", num_elem_copied, - "left:", total_elem_left); - auto cur_in_first = g_in_first + num_elem_copied; - auto cur_out_first = out_first + num_elem_copied; - dash::dart_storage ds(num_copy_elem); - DASH_ASSERT_RETURNS( - dart_get_blocking( - cur_out_first, - cur_in_first.dart_gptr(), - ds.nelem, - ds.dtype), - DART_OK); - num_elem_copied += num_copy_elem; - } + DASH_LOG_TRACE("dash::copy_impl", + "get elements:", num_elem_total); + auto cur_in_first = g_in_first; + auto cur_out_first = out_first; + dash::dart_storage ds(num_elem_total); + DASH_ASSERT_RETURNS( + dart_get_blocking( + cur_out_first, + cur_in_first.dart_gptr(), + ds.nelem, + ds.dtype), + DART_OK); + num_elem_copied = num_elem_total; } else { // Input range is spread over several remote units: DASH_LOG_TRACE("dash::copy_impl", "input range spans multiple units"); @@ -205,9 +182,7 @@ ValueType * copy_impl( // Number of elements left to copy: auto total_elem_left = num_elem_total - num_elem_copied; // Number of elements to copy in this iteration. - auto num_copy_elem = (num_unit_elem < max_copy_elem) - ? num_unit_elem - : max_copy_elem; + auto num_copy_elem = num_unit_elem; if (num_copy_elem > total_elem_left) { num_copy_elem = total_elem_left; } @@ -221,7 +196,6 @@ ValueType * copy_impl( "->", "unit elements:", num_unit_elem, "max elem/unit:", max_elem_per_unit, - "copy max:", max_copy_elem, "get elements:", num_copy_elem, "total:", num_elem_total, "copied:", num_elem_copied, From 7d576d98f6b3d6f0ebd79c6a238c759dfb0f046b Mon Sep 17 00:00:00 2001 From: Joseph Schuchart Date: Tue, 14 Nov 2017 14:58:43 +0900 Subject: [PATCH 04/11] Add dart_testall which tests and waits for remote completion if necessary --- .../include/dash/dart/if/dart_communication.h | 24 ++++- dart-impl/mpi/src/dart_communication.c | 98 ++++++++++++++++--- 2 files changed, 105 insertions(+), 17 deletions(-) diff --git a/dart-if/include/dash/dart/if/dart_communication.h b/dart-if/include/dash/dart/if/dart_communication.h index e0fcfc02c..6c57dc3da 100644 --- a/dart-if/include/dash/dart/if/dart_communication.h +++ b/dart-if/include/dash/dart/if/dart_communication.h @@ -601,6 +601,24 @@ dart_ret_t dart_testall_local( size_t n, int32_t * result) DART_NOTHROW; +/** + * Test for the completion of operations and ensure remote completion. + * If the transfers completed, the handles are invalidated and may not be + * used in another \c dart_wait or \c dart_test operation. + * + * \param handles Array of handles of operations to test for completion. + * \param n Number of \c handles to test for completion. + * \param[out] result \c True if all operations have completed. + * + * \return \c DART_OK on success, any other of \ref dart_ret_t otherwise. + * + * \threadsafe + * \ingroup DartCommunication + */ +dart_ret_t dart_testall( + dart_handle_t handles[], + size_t n, + int32_t * is_finished); /** * Free the handle without testing or waiting for completion of the operation. @@ -718,18 +736,18 @@ dart_ret_t dart_recv( /** * DART Equivalent to MPI sendrecv. * - * \param sendbuf Buffer containing the data to be sent by the + * \param sendbuf Buffer containing the data to be sent by the * source unit. * \param send_nelem Number of values sentby the source unit. * \param send_dtype The data type of values in \c sendbuf. * \param dest Unitthe message is sent to. - * \param send_tag Message tag for the distinction between different + * \param send_tag Message tag for the distinction between different * messages of the source unit. * \param recvbuf Buffer for the incoming data. * \param recv_nelem Number of values received by the destination unit. * \param recv_dtype The data type of values in \c recvbuf. * \param src Unit sending the message. - * \param recv_tag Message tag for the distinction between different + * \param recv_tag Message tag for the distinction between different * messages of the destination unit. * * \return \c DART_OK on success, any other of \ref dart_ret_t otherwise. diff --git a/dart-impl/mpi/src/dart_communication.c b/dart-impl/mpi/src/dart_communication.c index 2aedfe93b..0d1623d34 100644 --- a/dart-impl/mpi/src/dart_communication.c +++ b/dart-impl/mpi/src/dart_communication.c @@ -50,7 +50,7 @@ * Temporary space allocation: * - on the stack for allocations <=64B * - on the heap otherwise - * Mainly meant to be used in dart_waitall_* and dart_testall_local + * Mainly meant to be used in dart_waitall* and dart_testall* */ #define ALLOC_TMP(__size) ((__size)<=64) ? alloca((__size)) : malloc((__size)) /** @@ -1284,6 +1284,27 @@ dart_ret_t dart_waitall_local( return ret; } +static +dart_ret_t wait_remote_completion( + dart_handle_t *handles, + size_t n +) +{ + for (size_t i = 0; i < n; i++) { + if (handles[i] != DART_HANDLE_NULL && handles[i]->needs_flush) { + DART_LOG_DEBUG("dart_waitall: -- MPI_Win_flush(handle[%zu]: %p, dest: %d))", + i, (void*)handles[i], handles[i]->dest); + /* + * MPI_Win_flush to wait for remote completion if required: + */ + if (MPI_Win_flush(handles[i]->dest, handles[i]->win) != MPI_SUCCESS) { + return DART_ERR_INVAL; + } + } + } + return DART_OK; +} + dart_ret_t dart_waitall( dart_handle_t handles[], size_t n) @@ -1356,19 +1377,10 @@ dart_ret_t dart_waitall( * wait for completion of MPI requests at origins and targets: */ DART_LOG_DEBUG("dart_waitall: waiting for remote completion"); - for (size_t i = 0; i < n; i++) { - if (handles[i] != DART_HANDLE_NULL && handles[i]->needs_flush) { - DART_LOG_DEBUG("dart_waitall: -- MPI_Win_flush(handle[%zu]: %p, dest: %d))", - i, (void*)handles[i], handles[i]->dest); - /* - * MPI_Win_flush to wait for remote completion if required: - */ - if (MPI_Win_flush(handles[i]->dest, handles[i]->win) != MPI_SUCCESS) { - DART_LOG_ERROR("dart_waitall: MPI_Win_flush failed"); - FREE_TMP(2 * n * sizeof(MPI_Request), mpi_req); - return DART_ERR_INVAL; - } - } + if (DART_OK != wait_remote_completion(handles, n)) { + DART_LOG_ERROR("dart_waitall: MPI_Win_flush failed"); + FREE_TMP(2 * n * sizeof(MPI_Request), mpi_req); + return DART_ERR_OTHER; } /* @@ -1467,6 +1479,64 @@ dart_ret_t dart_testall_local( return DART_OK; } + +dart_ret_t dart_testall( + dart_handle_t handles[], + size_t n, + int32_t * is_finished) +{ + DART_LOG_DEBUG("dart_testall_local()"); + if (handles == NULL || n == 0) { + DART_LOG_DEBUG("dart_testall_local: empty handles"); + return DART_OK; + } + + MPI_Request *mpi_req = ALLOC_TMP(2 * n * sizeof (MPI_Request)); + size_t r_n = 0; + for (size_t i = 0; i < n; ++i) { + if (handles[i] != DART_HANDLE_NULL) { + for (uint8_t j = 0; j < handles[i]->num_reqs; ++j) { + if (handles[i]->reqs[j] != MPI_REQUEST_NULL){ + mpi_req[r_n] = handles[i]->reqs[j]; + ++r_n; + } + } + } + } + + if (r_n) { + if (MPI_Testall(r_n, mpi_req, is_finished, + MPI_STATUSES_IGNORE) != MPI_SUCCESS){ + FREE_TMP(2 * n * sizeof(MPI_Request), mpi_req); + DART_LOG_ERROR("dart_testall_local: MPI_Testall failed!"); + return DART_ERR_OTHER; + } + + if (*is_finished) { + /* + * wait for completion of MPI requests at origins and targets: + */ + DART_LOG_DEBUG("dart_testall: waiting for remote completion"); + if (DART_OK != wait_remote_completion(handles, n)) { + DART_LOG_ERROR("dart_testall: MPI_Win_flush failed"); + FREE_TMP(2 * n * sizeof(MPI_Request), mpi_req); + return DART_ERR_OTHER; + } + + for (size_t i = 0; i < n; i++) { + if (handles[i] != DART_HANDLE_NULL) { + // free the handle + free(handles[i]); + handles[i] = DART_HANDLE_NULL; + } + } + } + } + FREE_TMP(2 * n * sizeof(MPI_Request), mpi_req); + DART_LOG_DEBUG("dart_testall_local > finished"); + return DART_OK; +} + dart_ret_t dart_handle_free( dart_handle_t * handleptr) { From 1ed63cea2d7ec915d8eb120d67bf2064578bcd69 Mon Sep 17 00:00:00 2001 From: Joseph Schuchart Date: Tue, 14 Nov 2017 15:00:13 +0900 Subject: [PATCH 05/11] Make dash::Future non-copyable and movable --- dash/include/dash/Future.h | 38 ++++++++++++++------------------------ 1 file changed, 14 insertions(+), 24 deletions(-) diff --git a/dash/include/dash/Future.h b/dash/include/dash/Future.h index b392abdd6..e0e903da6 100644 --- a/dash/include/dash/Future.h +++ b/dash/include/dash/Future.h @@ -47,10 +47,15 @@ class Future { } Future(const get_func_t & func) - : _get_func(func), - _ready(false) + : _get_func(func) { } + Future( + const get_func_t & get_func, + const test_func_t & test_func) + : _get_func(get_func), + _test_func(test_func) + { } Future( const get_func_t & get_func, @@ -58,18 +63,11 @@ class Future const destroy_func_t & destroy_func) : _get_func(get_func), _test_func(test_func), - _destroy_func(destroy_func), - _ready(false) + _destroy_func(destroy_func) { } - Future( - const self_t & other) - : _get_func(other._get_func), - _test_func(other._test_func), - _destroy_func(other._destroy_func), - _value(other._value), - _ready(other._ready) - { } + Future(const self_t& other) = delete; + Future(self_t&& other) = default; ~Future() { if (_destroy_func) { @@ -77,17 +75,9 @@ class Future } } - Future & operator=(const self_t & other) - { - if (this != &other) { - _get_func = other._get_func; - _test_func = other._test_func; - _destroy_func = other._destroy_func; - _value = other._value; - _ready = other._ready; - } - return *this; - } + /// copy-assignment is not permitted + Future & operator=(const self_t& other) = delete; + Future & operator=(self_t&& other) = default; void wait() { @@ -106,7 +96,7 @@ class Future DASH_LOG_TRACE_VAR("Future.wait >", _ready); } - bool test() const + bool test() { if (!_ready && _test_func) { _ready = _test_func(&_value); From 2476493fe726be98aaebd65a5951453b11ff2076 Mon Sep 17 00:00:00 2001 From: Joseph Schuchart Date: Tue, 14 Nov 2017 15:01:57 +0900 Subject: [PATCH 06/11] Use DART handles for blocking and non-blocking copies and remove duplicate code --- dash/include/dash/algorithm/Copy.h | 607 ++++++++--------------------- 1 file changed, 172 insertions(+), 435 deletions(-) diff --git a/dash/include/dash/algorithm/Copy.h b/dash/include/dash/algorithm/Copy.h index 5e6e980b3..cb0317600 100644 --- a/dash/include/dash/algorithm/Copy.h +++ b/dash/include/dash/algorithm/Copy.h @@ -13,12 +13,6 @@ #include #include - -// #ifndef DASH__ALGORITHM__COPY__USE_WAIT -//#define DASH__ALGORITHM__COPY__USE_FLUSH - #define DASH__ALGORITHM__COPY__USE_WAIT -// #endif - namespace dash { #ifdef DOXYGEN @@ -111,9 +105,10 @@ template < typename ValueType, class GlobInputIt > ValueType * copy_impl( - GlobInputIt in_first, - GlobInputIt in_last, - ValueType * out_first) + GlobInputIt in_first, + GlobInputIt in_last, + ValueType * out_first, + std::vector & handles) { DASH_LOG_TRACE("dash::copy_impl()", "in_first:", in_first.pos(), @@ -152,14 +147,17 @@ ValueType * copy_impl( "get elements:", num_elem_total); auto cur_in_first = g_in_first; auto cur_out_first = out_first; + dart_handle_t handle; dash::dart_storage ds(num_elem_total); DASH_ASSERT_RETURNS( - dart_get_blocking( + dart_get_handle( cur_out_first, cur_in_first.dart_gptr(), ds.nelem, - ds.dtype), + ds.dtype, + &handle), DART_OK); + handles.push_back(handle); num_elem_copied = num_elem_total; } else { // Input range is spread over several remote units: @@ -202,18 +200,21 @@ ValueType * copy_impl( "left:", total_elem_left); auto dest_ptr = out_first + num_elem_copied; auto src_gptr = cur_in_first.dart_gptr(); + dart_handle_t handle; dash::dart_storage ds(num_copy_elem); - if (dart_get_blocking( + if (dart_get_handle( dest_ptr, src_gptr, ds.nelem, - ds.dtype) + ds.dtype, + &handle) != DART_OK) { DASH_LOG_ERROR("dash::copy_impl", "dart_get failed"); DASH_THROW( dash::exception::RuntimeError, "dart_get failed"); } num_elem_copied += num_copy_elem; + handles.push_back(handle); } } @@ -222,255 +223,6 @@ ValueType * copy_impl( return out_last; } -/** - * Asynchronous implementation of \c dash::copy (global to local) without - * optimization for local subrange. - */ -template < - typename ValueType, - class GlobInputIt > -dash::Future copy_async_impl( - GlobInputIt in_first, - GlobInputIt in_last, - ValueType * out_first) -{ - DASH_LOG_TRACE("dash::copy_async_impl()", - "in_first:", in_first.pos(), - "in_last:", in_last.pos(), - "out_first:", out_first); - auto pattern = in_first.pattern(); - typedef typename decltype(pattern)::index_type index_type; - typedef typename decltype(pattern)::size_type size_type; - size_type num_elem_total = dash::distance(in_first, in_last); - if (num_elem_total <= 0) { - DASH_LOG_TRACE("dash::copy_async_impl", "input range empty"); - return dash::Future(out_first); - } - DASH_LOG_TRACE("dash::copy_async_impl", - "total elements:", num_elem_total, - "expected out_last:", out_first + num_elem_total); - // Input iterators could be relative to a view. Map first input iterator - // to global index range and use it to resolve last input iterator. - // Do not use in_last.global() as this would span over the relative input - // range. - auto g_in_first = in_first.global(); - auto g_in_last = g_in_first + num_elem_total; - DASH_LOG_TRACE("dash::copy_async_impl", - "g_in_first:", g_in_first.pos(), - "g_in_last:", g_in_last.pos()); - auto unit_first = pattern.unit_at(g_in_first.pos()); - DASH_LOG_TRACE_VAR("dash::copy_async_impl", unit_first); - auto unit_last = pattern.unit_at(g_in_last.pos() - 1); - DASH_LOG_TRACE_VAR("dash::copy_async_impl", unit_last); - - // Accessed global pointers to be flushed: -#ifdef DASH__ALGORITHM__COPY__USE_FLUSH - using req_handles_t = ::std::vector; -#else - using req_handles_t = ::std::vector; -#endif - using shared_ptr_t = std::shared_ptr; - shared_ptr_t req_handles = std::make_shared(); - - // MPI uses offset type int, do not copy more than INT_MAX bytes: - size_type max_copy_elem = (std::numeric_limits::max() / - sizeof(ValueType)); - size_type num_elem_copied = 0; - DASH_LOG_TRACE_VAR("dash::copy_async_impl", max_copy_elem); - if (num_elem_total > max_copy_elem) { - DASH_LOG_DEBUG("dash::copy_async_impl", - "cannot copy", num_elem_total, "elements", - "in a single dart_get operation"); - } - if (unit_first == unit_last) { - // Input range is located at a single remote unit: - DASH_LOG_TRACE("dash::copy_async_impl", "input range at single unit"); - while (num_elem_copied < num_elem_total) { - // Number of elements left to copy: - auto total_elem_left = num_elem_total - num_elem_copied; - auto num_copy_elem = (num_elem_total > max_copy_elem) - ? max_copy_elem - : num_elem_total; - if (num_copy_elem > total_elem_left) { - num_copy_elem = total_elem_left; - } - DASH_LOG_TRACE("dash::copy_async_impl", - "copy max:", max_copy_elem, - "get elements:", num_copy_elem, - "total:", num_elem_total, - "copied:", num_elem_copied, - "left:", total_elem_left); - auto cur_in_first = g_in_first + num_elem_copied; - auto cur_out_first = out_first + num_elem_copied; -#ifdef DASH__ALGORITHM__COPY__USE_FLUSH - dash::dart_storage ds(num_copy_elem); - DASH_ASSERT_RETURNS( - dart_get( - cur_out_first, - cur_in_first.dart_gptr(), - ds.nelem, - ds.dtype), - DART_OK); - req_handles->push_back(in_first.dart_gptr()); -#else - dart_handle_t get_handle; - dash::dart_storage ds(num_copy_elem); - DASH_ASSERT_RETURNS( - dart_get_handle( - cur_out_first, - cur_in_first.dart_gptr(), - ds.nelem, - ds.dtype, - &get_handle), - DART_OK); - if (get_handle != NULL) { - req_handles->push_back(get_handle); - } -#endif - num_elem_copied += num_copy_elem; - } - } else { - // Input range is spread over several remote units: - DASH_LOG_TRACE("dash::copy_async_impl", "input range spans multiple units"); - // - // Copy elements from every unit: - // - while (num_elem_copied < num_elem_total) { - // Global iterator pointing at begin of current unit's input range: - auto cur_in_first = g_in_first + num_elem_copied; - // unit and local index of first element in current range segment: - auto local_pos = pattern.local(static_cast( - cur_in_first.pos())); - // Number of elements located at current source unit: - size_type max_elem_per_unit = pattern.local_size(local_pos.unit); - // Local offset of first element in input range at current unit: - auto l_in_first_idx = local_pos.index; - // Maximum number of elements to copy from current unit: - auto num_unit_elem = max_elem_per_unit - l_in_first_idx; - // Number of elements left to copy: - auto total_elem_left = num_elem_total - num_elem_copied; - // Number of elements to copy in this iteration. - auto num_copy_elem = (num_unit_elem < max_copy_elem) - ? num_unit_elem - : max_copy_elem; - if (num_copy_elem > total_elem_left) { - num_copy_elem = total_elem_left; - } - DASH_ASSERT_GT(num_copy_elem, 0, - "Number of element to copy is 0"); - DASH_LOG_TRACE("dash::copy_async_impl", - "start g_idx:", cur_in_first.pos(), - "->", - "unit:", local_pos.unit, - "l_idx:", l_in_first_idx, - "->", - "unit elements:", num_unit_elem, - "max elem/unit:", max_elem_per_unit, - "copy max:", max_copy_elem, - "get elements:", num_copy_elem, - "total:", num_elem_total, - "copied:", num_elem_copied, - "left:", total_elem_left); - auto src_gptr = cur_in_first.dart_gptr(); - auto dest_ptr = out_first + num_elem_copied; -#ifdef DASH__ALGORITHM__COPY__USE_FLUSH - dash::dart_storage ds(num_copy_elem); - if (dart_get( - dest_ptr, - src_gptr, - ds.nelem, - ds.dtype) - != DART_OK) { - DASH_LOG_ERROR("dash::copy_async_impl", "dart_get failed"); - DASH_THROW( - dash::exception::RuntimeError, "dart_get failed"); - } - req_handles->push_back(src_gptr); -#else - dart_handle_t get_handle; - dash::dart_storage ds(num_copy_elem); - DASH_ASSERT_RETURNS( - dart_get_handle( - dest_ptr, - src_gptr, - ds.nelem, - ds.dtype, - &get_handle), - DART_OK); - if (get_handle != NULL) { - req_handles->push_back(get_handle); - } -#endif - num_elem_copied += num_copy_elem; - } - } -#ifdef DASH_ENABLE_TRACE_LOGGING - for (auto gptr : *req_handles) { - DASH_LOG_TRACE("dash::copy_async_impl", " req_handle:", gptr); - } -#endif - dash::Future result( - // get - [=]() mutable { - // Wait for all get requests to complete: - ValueType * _out = out_first + num_elem_copied; - DASH_LOG_TRACE("dash::copy_async_impl [Future]()", - " wait for", req_handles->size(), "async get request"); - DASH_LOG_TRACE("dash::copy_async_impl [Future]", " flush:", req_handles); - DASH_LOG_TRACE("dash::copy_async_impl [Future]", " _out:", _out); - #ifdef DASH_ENABLE_TRACE_LOGGING - for (auto gptr : *req_handles) { - DASH_LOG_TRACE("dash::copy_async_impl [Future]", " req_handle:", - gptr); - } - #endif - #ifdef DASH__ALGORITHM__COPY__USE_FLUSH - for (auto gptr : *req_handles) { - dart_flush_local_all(gptr); - } - #else - if (req_handles->size() > 0) { - if (dart_waitall_local(req_handles->data(), req_handles->size()) - != DART_OK) { - DASH_LOG_ERROR("dash::copy_async_impl [Future]", - " dart_waitall_local failed"); - DASH_THROW( - dash::exception::RuntimeError, - "dash::copy_async_impl [Future]: dart_waitall_local failed"); - } - } else { - DASH_LOG_TRACE("dash::copy_async_impl [Future]", " No pending handles"); - } - #endif - DASH_LOG_TRACE("dash::copy_async_impl [Future] >", - " async requests completed, _out:", _out); - return _out; - } -#ifndef DASH__ALGORITHM__COPY__USE_FLUSH - , - // test - [=](ValueType ** out) mutable { - int32_t flag; - dart_testall_local(req_handles->data(), req_handles->size(), &flag); - if (flag) { - req_handles->clear(); - *out = out_first + num_elem_copied; - } - return (flag != 0); - }, - // destroy - [=]() mutable { - for (auto& handle : *req_handles) { - dart_handle_free(&handle); - } - } - -#endif - ); - DASH_LOG_TRACE("dash::copy_async_impl >", " returning future"); - return result; -} - // ========================================================================= // Local to Global // ========================================================================= @@ -483,9 +235,10 @@ template < typename ValueType, class GlobOutputIt > GlobOutputIt copy_impl( - ValueType * in_first, - ValueType * in_last, - GlobOutputIt out_first) + ValueType * in_first, + ValueType * in_last, + GlobOutputIt out_first, + std::vector & handles) { DASH_LOG_TRACE("dash::copy_impl()", "l_in_first:", in_first, @@ -493,14 +246,17 @@ GlobOutputIt copy_impl( "g_out_first:", out_first.pos()); auto num_elements = std::distance(in_first, in_last); + dart_handle_t handle; dash::dart_storage ds(num_elements); DASH_ASSERT_RETURNS( - dart_put_blocking( + dart_put_handle( out_first.dart_gptr(), in_first, ds.nelem, - ds.dtype), + ds.dtype, + &handle), DART_OK); + handles.push_back(handle); auto out_last = out_first + num_elements; DASH_LOG_TRACE("dash::copy_impl >", @@ -508,132 +264,6 @@ GlobOutputIt copy_impl( return out_last; } - -/** - * Asynchronous implementation of \c dash::copy (local to global) without - * optimization for local subrange. - */ -template < - typename ValueType, - class GlobOutputIt > -dash::Future copy_async_impl( - ValueType * in_first, - ValueType * in_last, - GlobOutputIt out_first) -{ - DASH_LOG_TRACE("dash::copy_async_impl()", - "l_in_first:", in_first, - "l_in_last:", in_last, - "g_out_first:", out_first.dart_gptr()); - - // Accessed global pointers to be flushed: -#ifdef DASH__ALGORITHM__COPY__USE_FLUSH - using req_handles_t = ::std::vector; -#else - using req_handles_t = ::std::vector; -#endif - using shared_ptr_t = std::shared_ptr; - shared_ptr_t req_handles = std::make_shared(); - - auto num_copy_elem = std::distance(in_first, in_last); - auto src_ptr = in_first; - auto dest_gptr = out_first.dart_gptr(); -#ifdef DASH__ALGORITHM__COPY__USE_FLUSH - dash::dart_storage ds(num_copy_elem); - if (dart_put( - dest_gptr, - src_ptr, - ds.nelem, - ds.dtype) - != DART_OK) { - DASH_LOG_ERROR("dash::copy_async_impl", "dart_put failed"); - DASH_THROW( - dash::exception::RuntimeError, "dart_put failed"); - } - req_handles->push_back(dest_gptr); -#else - dart_handle_t put_handle; - dash::dart_storage ds(num_copy_elem); - DASH_ASSERT_RETURNS( - dart_put_handle( - dest_gptr, - src_ptr, - ds.nelem, - ds.dtype, - &put_handle), - DART_OK); - if (put_handle != NULL) { - req_handles->push_back(put_handle); - } -#endif - -#ifdef DASH_ENABLE_TRACE_LOGGING - for (auto gptr : *req_handles) { - DASH_LOG_TRACE("dash::copy_async_impl", " req_handle:", gptr); - } -#endif - dash::Future result( - // get - [=]() mutable { - // Wait for all get requests to complete: - GlobOutputIt _out = out_first + num_copy_elem; - DASH_LOG_TRACE("dash::copy_async_impl [Future]()", - " wait for", req_handles->size(), "async put request"); - DASH_LOG_TRACE("dash::copy_async_impl [Future]", " flush:", *req_handles); - DASH_LOG_TRACE("dash::copy_async_impl [Future]", " _out:", _out); -#ifdef DASH_ENABLE_TRACE_LOGGING - for (auto gptr : *req_handles) { - DASH_LOG_TRACE("dash::copy_async_impl [Future]", " req_handle:", - gptr); - } -#endif -#ifdef DASH__ALGORITHM__COPY__USE_FLUSH - for (auto gptr : *req_handles) { - dart_flush_all(gptr); - } -#else - if (req_handles->size() > 0) { - if (dart_waitall(req_handles->data(), req_handles->size()) - != DART_OK) { - DASH_LOG_ERROR("dash::copy_async_impl [Future]", - " dart_waitall failed"); - DASH_THROW( - dash::exception::RuntimeError, - "dash::copy_async_impl [Future]: dart_waitall failed"); - } - } else { - DASH_LOG_TRACE("dash::copy_async_impl [Future]", " No pending handles"); - } -#endif - req_handles->clear(); - DASH_LOG_TRACE("dash::copy_async_impl [Future] >", - " async requests completed, _out:", _out); - return _out; - } -#ifndef DASH__ALGORITHM__COPY__USE_FLUSH - , - // test - [=](GlobOutputIt *out) mutable { - int32_t flag; - dart_testall_local(req_handles->data(), req_handles->size(), &flag); - if (flag) { - req_handles->clear(); - *out = out_first + num_copy_elem; - } - return (flag != 0); - }, - // destroy - [=]() mutable { - for (auto& handle : *req_handles) { - dart_handle_free(&handle); - } - } -#endif - ); - DASH_LOG_TRACE("dash::copy_async_impl >", " returning future"); - return result; -} - } // namespace internal @@ -657,17 +287,19 @@ dash::Future copy_async( ValueType * out_first) { const auto & team = in_first.team(); + + DASH_LOG_TRACE("dash::copy_async()", "async, global to local"); + if (in_first == in_last) { + DASH_LOG_TRACE("dash::copy_async", "input range empty"); + return dash::Future(out_first); + } + dash::util::UnitLocality uloc(team, team.myid()); // Size of L2 data cache line: int l2_line_size = uloc.hwinfo().cache_line_sizes[1]; bool use_memcpy = ((in_last - in_first) * sizeof(ValueType)) <= l2_line_size; - DASH_LOG_TRACE("dash::copy_async()", "async, global to local"); - if (in_first == in_last) { - DASH_LOG_TRACE("dash::copy_async", "input range empty"); - return dash::Future([=]() { return out_first; }); - } ValueType * dest_first = out_first; // Return value, initialize with begin of output range, indicating no values // have been copied: @@ -707,15 +339,15 @@ dash::Future copy_async( } DASH_LOG_TRACE("dash::copy_async", "finished local copy of", (out_last - out_first), "elements"); - return dash::Future([=]() { return out_last; }); + return dash::Future(out_last); } + auto handles = std::make_shared>(); + DASH_LOG_TRACE("dash::copy_async", "local range:", li_range_in.begin, li_range_in.end, "in_first.is_local:", in_first.is_local()); - // Futures of asynchronous get requests: - auto futures = std::vector< dash::Future >(); // Check if global input range is partially local: if (num_local_elem > 0) { // Part of the input range is local, copy local input subrange to local @@ -757,10 +389,10 @@ dash::Future copy_async( // ... [ --- copy --- | ... l ... | ........ ] // ^ ^ ^ ^ // in_first l_in_first l_in_last in_last - auto fut_prelocal = dash::internal::copy_async_impl(g_in_first, - g_l_in_first, - dest_first); - futures.push_back(fut_prelocal); + dash::internal::copy_impl(g_in_first, + g_l_in_first, + dest_first, + *handles); // Advance output pointers: out_last += num_prelocal_elem; dest_first = out_last; @@ -780,10 +412,10 @@ dash::Future copy_async( // ... [ ........ | ... l ... | --- copy --- ] // ^ ^ ^ ^ // in_first l_in_first l_in_last in_last - auto fut_postlocal = dash::internal::copy_async_impl(g_l_in_last, - g_in_last, - dest_first); - futures.push_back(fut_postlocal); + dash::internal::copy_impl(g_l_in_last, + g_in_last, + dest_first, + *handles); out_last += num_postlocal_elem; } // @@ -829,26 +461,59 @@ dash::Future copy_async( } else { DASH_LOG_TRACE("dash::copy_async", "no local subrange"); // All elements in input range are remote - auto fut_all = dash::internal::copy_async_impl(in_first, - in_last, - dest_first); - futures.push_back(fut_all); + dash::internal::copy_impl(in_first, + in_last, + dest_first, + *handles); out_last = out_first + total_copy_elem; } DASH_LOG_TRACE("dash::copy_async", "preparing future"); - dash::Future fut_result([=]() mutable { - ValueType * _out = out_last; - DASH_LOG_TRACE("dash::copy_async [Future]()", - "wait for", futures.size(), "async copy requests"); - DASH_LOG_TRACE("dash::copy_async [Future]", " futures:", futures); - DASH_LOG_TRACE("dash::copy_async [Future]", " _out:", _out); - for (auto f : futures) { - f.wait(); + dash::Future fut_result( + // wait + [=]() mutable { + // Wait for all get requests to complete: + ValueType * _out = out_last; + DASH_LOG_TRACE("dash::copy_async_impl [Future]()", + " wait for", handles->size(), "async get request"); + DASH_LOG_TRACE("dash::copy_async_impl [Future]", " _out:", _out); + if (handles->size() > 0) { + if (dart_waitall_local(handles->data(), handles->size()) + != DART_OK) { + DASH_LOG_ERROR("dash::copy_async_impl [Future]", + " dart_waitall_local failed"); + DASH_THROW( + dash::exception::RuntimeError, + "dash::copy_async_impl [Future]: dart_waitall_local failed"); + } + } else { + DASH_LOG_TRACE("dash::copy_async_impl [Future]", " No pending handles"); + } + DASH_LOG_TRACE("dash::copy_async_impl [Future] >", + " async requests completed, _out:", _out); + return _out; + }, + // test + [=](ValueType ** out) mutable { + int32_t flag; + DASH_ASSERT_RETURNS( + DART_OK, + dart_testall_local(handles->data(), handles->size(), &flag)); + if (flag) { + handles->clear(); + *out = out_last; + } + return (flag != 0); + }, + // destroy + [=]() mutable { + for (auto& handle : *handles) { + DASH_ASSERT_RETURNS( + DART_OK, + dart_handle_free(&handle)); + } } - DASH_LOG_TRACE("dash::copy_async [Future] >", "async requests completed", - "futures:", futures, "_out:", _out); - return _out; - }); + ); + DASH_LOG_TRACE("dash::copy_async >", "finished,", "expected out_last:", out_last); return fut_result; @@ -915,6 +580,8 @@ ValueType * copy( return out_last; } + std::vector handles; + DASH_LOG_TRACE("dash::copy", "local range:", li_range_in.begin, li_range_in.end, @@ -967,7 +634,8 @@ ValueType * copy( // in_first l_in_first l_in_last in_last out_last = dash::internal::copy_impl(g_in_first, g_l_in_first, - dest_first); + dest_first, + handles); // Advance output pointers: dest_first = out_last; } @@ -1025,15 +693,24 @@ ValueType * copy( // in_first l_in_first l_in_last in_last out_last = dash::internal::copy_impl(g_l_in_last, g_in_last, - dest_first); + dest_first, + handles); } } else { DASH_LOG_TRACE("dash::copy", "no local subrange"); // All elements in input range are remote out_last = dash::internal::copy_impl(in_first, in_last, - dest_first); + dest_first, + handles); + } + + if (handles.size() > 0) { + DASH_LOG_TRACE("dash::copy", "Waiting for remote transfers to complete,", + "num_handles: ", handles.size()); + dart_waitall_local(handles.data(), handles.size()); } + DASH_LOG_TRACE("dash::copy >", "finished,", "out_last:", out_last); return out_last; @@ -1057,10 +734,58 @@ dash::Future copy_async( ValueType * in_last, GlobOutputIt out_first) { - auto fut = dash::internal::copy_async_impl(in_first, - in_last, - out_first); - return fut; + auto handles = std::make_shared>(); + auto out_last = dash::internal::copy_impl(in_first, + in_last, + out_first, + *handles); + dash::Future fut_result( + // get + [=]() mutable { + // Wait for all get requests to complete: + GlobOutputIt _out = out_last; + DASH_LOG_TRACE("dash::copy_async [Future]()", + " wait for", handles->size(), "async put request"); + DASH_LOG_TRACE("dash::copy_async [Future]", " _out:", _out); + if (handles->size() > 0) { + if (dart_waitall(handles->data(), handles->size()) + != DART_OK) { + DASH_LOG_ERROR("dash::copy_async [Future]", + " dart_waitall failed"); + DASH_THROW( + dash::exception::RuntimeError, + "dash::copy_async [Future]: dart_waitall failed"); + } + } else { + DASH_LOG_TRACE("dash::copy_async [Future]", " No pending handles"); + } + handles->clear(); + DASH_LOG_TRACE("dash::copy_async [Future] >", + " async requests completed, _out:", _out); + return _out; + }, + // test + [=](GlobOutputIt *out) mutable { + int32_t flag; + DASH_ASSERT_RETURNS( + DART_OK, + dart_testall(handles->data(), handles->size(), &flag)); + if (flag) { + handles->clear(); + *out = out_last; + } + return (flag != 0); + }, + // destroy + [=]() mutable { + for (auto& handle : *handles) { + DASH_ASSERT_RETURNS( + DART_OK, + dart_handle_free(&handle)); + } + } + ); + return fut_result; } /** @@ -1093,6 +818,8 @@ GlobOutputIt copy( DASH_LOG_TRACE_VAR("dash::copy", li_range_out.end); // Number of elements in the local subrange: auto num_local_elem = li_range_out.end - li_range_out.begin; + // handles to wait on at the end + std::vector handles; // Check if part of the output range is local: if (num_local_elem > 0) { // Part of the output range is local @@ -1132,7 +859,8 @@ GlobOutputIt copy( out_last = dash::internal::copy_impl( in_first, in_first + l_elem_offset, - out_first); + out_first, + handles); } // Copy to remote elements succeeding the local subrange: if (g_l_offset_end < out_h_last.pos()) { @@ -1140,7 +868,8 @@ GlobOutputIt copy( out_last = dash::internal::copy_impl( in_first + l_elem_offset + num_local_elem, in_last, - out_first + num_local_elem); + out_first + num_local_elem, + handles); } } else { // All elements in output range are remote @@ -1148,8 +877,16 @@ GlobOutputIt copy( out_last = dash::internal::copy_impl( in_first, in_last, - out_first); + out_first, + handles); + } + + if (handles.size() > 0) { + DASH_LOG_TRACE("dash::copy", "Waiting for remote transfers to complete,", + "num_handles: ", handles.size()); + dart_waitall_local(handles.data(), handles.size()); } + return out_last; } From ef52c0e11f8eda117a7bc9007f726c9198cfcdae Mon Sep 17 00:00:00 2001 From: Joseph Schuchart Date: Tue, 14 Nov 2017 15:02:46 +0900 Subject: [PATCH 07/11] Add tests for waiting on a copy using fut.test() --- dash/test/algorithm/CopyTest.cc | 98 +++++++++++++++++++++++++++++++-- 1 file changed, 92 insertions(+), 6 deletions(-) diff --git a/dash/test/algorithm/CopyTest.cc b/dash/test/algorithm/CopyTest.cc index c27b60f70..8effa9ea4 100644 --- a/dash/test/algorithm/CopyTest.cc +++ b/dash/test/algorithm/CopyTest.cc @@ -414,7 +414,7 @@ TEST_F(CopyTest, BlockingLocalToGlobalBlock) array.barrier(); } -TEST_F(CopyTest, AsyncLocalToGlobPtr) +TEST_F(CopyTest, AsyncLocalToGlobPtrWait) { // Copy all elements contained in a single, continuous block. const int num_elem_per_unit = 5; @@ -443,14 +443,14 @@ TEST_F(CopyTest, AsyncLocalToGlobPtr) glob_ptr_t gptr_dest = static_cast( array.begin() + global_offset); - LOG_MESSAGE("CopyTest.AsyncLocalToGlobPtr: call copy_async"); + LOG_MESSAGE("CopyTest.AsyncLocalToGlobPtrWait: call copy_async"); auto copy_fut = dash::copy_async(local_range, local_range + num_elem_per_unit, gptr_dest); // Blocks until remote completion: - LOG_MESSAGE("CopyTest.AsyncLocalToGlobPtr: call fut.wait"); + LOG_MESSAGE("CopyTest.AsyncLocalToGlobPtrWait: call fut.wait"); copy_fut.wait(); array.barrier(); @@ -463,6 +463,56 @@ TEST_F(CopyTest, AsyncLocalToGlobPtr) array.barrier(); } + +TEST_F(CopyTest, AsyncLocalToGlobPtrTest) +{ + // Copy all elements contained in a single, continuous block. + const int num_elem_per_unit = 5; + size_t num_elem_total = _dash_size * num_elem_per_unit; + + // Global target range: + dash::Array array(num_elem_total, dash::BLOCKED); + // Local range to copy: + int local_range[num_elem_per_unit]; + + // Assign initial values: [ 1000, 1001, 1002, ... 2000, 2001, ... ] + for (auto l = 0; l < num_elem_per_unit; ++l) { + array.local[l] = ((dash::myid() + 1) * 110000) + l; + local_range[l] = ((dash::myid() + 1) * 1000) + l; + } + array.barrier(); + + // Copy values from local range to remote global range. + // All units (u) copy into block (nblocks-1-u), so unit 0 copies into + // last block. + auto block_offset = (dash::myid() + 1) % dash::size(); + auto global_offset = block_offset * num_elem_per_unit; + + using glob_it_t = decltype(array.begin()); + using glob_ptr_t = typename glob_it_t::pointer; + + glob_ptr_t gptr_dest = static_cast( + array.begin() + global_offset); + LOG_MESSAGE("CopyTest.AsyncLocalToGlobPtrTest: call copy_async"); + + auto copy_fut = dash::copy_async(local_range, + local_range + num_elem_per_unit, + gptr_dest); + + // Blocks until remote completion: + LOG_MESSAGE("CopyTest.AsyncLocalToGlobPtrTest: call fut.test"); + while (!copy_fut.test()) {} + + array.barrier(); + + for (auto l = 0; l < num_elem_per_unit; ++l) { + // Compare local buffer and global array dest range: + EXPECT_EQ_U(local_range[l], + static_cast(array[global_offset + l])); + } + array.barrier(); +} + TEST_F(CopyTest, BlockingGlobalToLocalSubBlock) { // Copy all elements contained in a single, continuous block, @@ -729,7 +779,7 @@ TEST_F(CopyTest, AsyncGlobalToLocalTiles) auto req = dash::copy_async(gblock_a.begin(), gblock_a.end(), matrix_b_dest); - req_handles.push_back(req); + req_handles.push_back(std::move(req)); dst_pointers.push_back(matrix_b_dest); } @@ -743,7 +793,7 @@ TEST_F(CopyTest, AsyncGlobalToLocalTiles) // To prevent compiler from removing work load loop in optimization: LOG_MESSAGE("Dummy result: %f", m); - for (auto req : req_handles) { + for (auto& req : req_handles) { // Wait for completion of async copy operation. // Returns pointer to final element copied into target range: value_t * copy_dest_end = req.get(); @@ -769,7 +819,7 @@ TEST_F(CopyTest, AsyncGlobalToLocalTiles) } } -TEST_F(CopyTest, AsyncGlobalToLocalBlock) +TEST_F(CopyTest, AsyncGlobalToLocalBlockWait) { // Copy all elements contained in a single, continuous block. const int num_elem_per_unit = 20; @@ -803,6 +853,42 @@ TEST_F(CopyTest, AsyncGlobalToLocalBlock) } } +TEST_F(CopyTest, AsyncGlobalToLocalTest) +{ + // Copy all elements contained in a single, continuous block. + const int num_elem_per_unit = 20; + size_t num_elem_total = _dash_size * num_elem_per_unit; + + dash::Array array(num_elem_total, dash::BLOCKED); + + EXPECT_EQ_U(num_elem_per_unit, array.local.size()); + EXPECT_EQ_U(num_elem_per_unit, array.lsize()); + + // Assign initial values: [ 1000, 1001, 1002, ... 2000, 2001, ... ] + for (auto l = 0; l < num_elem_per_unit; ++l) { + array.local[l] = ((dash::myid() + 1) * 1000) + l; + } + array.barrier(); + + // Local range to store copy: + int local_copy[num_elem_per_unit]; + + // Copy values from global range to local memory. + // All units copy first block, so unit 0 tests local-to-local copying. + auto dest_end = dash::copy_async(array.begin(), + array.begin() + num_elem_per_unit, + local_copy); + + // spin until the transfer is completed + while (!dest_end.test()) { } + + EXPECT_EQ_U(num_elem_per_unit, dest_end.get() - local_copy); + for (auto l = 0; l < num_elem_per_unit; ++l) { + EXPECT_EQ_U(static_cast(array[l]), + local_copy[l]); + } +} + #if 0 // TODO TEST_F(CopyTest, AsyncAllToLocalVector) From 008a90e36767bfcb922bc3332302cfa66f5a69ba Mon Sep 17 00:00:00 2001 From: Joseph Schuchart Date: Tue, 14 Nov 2017 15:22:00 +0900 Subject: [PATCH 08/11] Add dart_test to test for local completion and wait for remote completion if necessary --- .../include/dash/dart/if/dart_communication.h | 17 +++++++ dart-impl/mpi/src/dart_communication.c | 47 +++++++++++++++++-- 2 files changed, 59 insertions(+), 5 deletions(-) diff --git a/dart-if/include/dash/dart/if/dart_communication.h b/dart-if/include/dash/dart/if/dart_communication.h index 6c57dc3da..cd513e4b8 100644 --- a/dart-if/include/dash/dart/if/dart_communication.h +++ b/dart-if/include/dash/dart/if/dart_communication.h @@ -582,6 +582,23 @@ dart_ret_t dart_test_local( dart_handle_t * handle, int32_t * result) DART_NOTHROW; +/** + * Test for the completion of an operation and ensure remote completion. + * If the transfer completed, the handle is invalidated and may not be used + * in another \c dart_wait or \c dart_test operation. + * + * \param handle The handle of an operation to test for completion. + * \param[out] result \c True if the operation has completed. + * + * \return \c DART_OK on success, any other of \ref dart_ret_t otherwise. + * + * \threadsafe + * \ingroup DartCommunication + */ +dart_ret_t dart_test( + dart_handle_t * handleptr, + int32_t * is_finished); + /** * Test for the local completion of operations. * If the transfers completed, the handles are invalidated and may not be diff --git a/dart-impl/mpi/src/dart_communication.c b/dart-impl/mpi/src/dart_communication.c index 6842a0e90..d526bdcbc 100644 --- a/dart-impl/mpi/src/dart_communication.c +++ b/dart-impl/mpi/src/dart_communication.c @@ -1420,11 +1420,10 @@ dart_ret_t dart_test_local( *is_finished = 0; dart_handle_t handle = *handleptr; - if (MPI_Testall(handle->num_reqs, handle->reqs, - &flag, MPI_STATUSES_IGNORE) != MPI_SUCCESS) { - DART_LOG_ERROR("dart_test_local: MPI_Test failed!"); - return DART_ERR_OTHER; - } + CHECK_MPI_RET( + MPI_Testall(handle->num_reqs, handle->reqs, + &flag, MPI_STATUSES_IGNORE), + "MPI_Testall"); if (flag) { // deallocate handle @@ -1436,6 +1435,44 @@ dart_ret_t dart_test_local( return DART_OK; } + +dart_ret_t dart_test( + dart_handle_t * handleptr, + int32_t * is_finished) +{ + int flag; + + DART_LOG_DEBUG("dart_test()"); + if (handleptr == NULL || + *handleptr == DART_HANDLE_NULL || + (*handleptr)->num_reqs == 0) { + *is_finished = 1; + return DART_OK; + } + *is_finished = 0; + + dart_handle_t handle = *handleptr; + CHECK_MPI_RET( + MPI_Testall(handle->num_reqs, handle->reqs, + &flag, MPI_STATUSES_IGNORE), + "MPI_Testall"); + + if (flag) { + if (handle->needs_flush) { + CHECK_MPI_RET( + MPI_Win_flush(handle->dest, handle->win), + "MPI_Win_flush" + ); + } + // deallocate handle + free(handle); + *handleptr = DART_HANDLE_NULL; + *is_finished = 1; + } + DART_LOG_DEBUG("dart_test > finished"); + return DART_OK; +} + dart_ret_t dart_testall_local( dart_handle_t handles[], size_t n, From bbef3ca08f110b4ad3ed4a9abad4fb94548ce97b Mon Sep 17 00:00:00 2001 From: Joseph Schuchart Date: Fri, 24 Nov 2017 14:26:47 +0900 Subject: [PATCH 09/11] dart_testall: signal success if no valid handles available --- dart-impl/mpi/src/dart_communication.c | 2 ++ 1 file changed, 2 insertions(+) diff --git a/dart-impl/mpi/src/dart_communication.c b/dart-impl/mpi/src/dart_communication.c index 65b0c7a82..f69047c28 100644 --- a/dart-impl/mpi/src/dart_communication.c +++ b/dart-impl/mpi/src/dart_communication.c @@ -1667,6 +1667,8 @@ dart_ret_t dart_testall( } } } + } else { + *is_finished = 1; } FREE_TMP(2 * n * sizeof(MPI_Request), mpi_req); DART_LOG_DEBUG("dart_testall_local > finished"); From 1a1f7134c37bbb11e57ccaf31a72ba8400fce897 Mon Sep 17 00:00:00 2001 From: Joseph Schuchart Date: Fri, 24 Nov 2017 14:27:53 +0900 Subject: [PATCH 10/11] copy_async: short-cut if no handles required --- dash/include/dash/algorithm/Copy.h | 23 +++++++++++++++++++---- 1 file changed, 19 insertions(+), 4 deletions(-) diff --git a/dash/include/dash/algorithm/Copy.h b/dash/include/dash/algorithm/Copy.h index d98c2b1d0..f1f7fbf33 100644 --- a/dash/include/dash/algorithm/Copy.h +++ b/dash/include/dash/algorithm/Copy.h @@ -153,7 +153,9 @@ ValueType * copy_impl( cur_out_first, num_elem_total, &handle); - handles.push_back(handle); + if (handle != DART_HANDLE_NULL) { + handles.push_back(handle); + } num_elem_copied = num_elem_total; } else { // Input range is spread over several remote units: @@ -199,7 +201,9 @@ ValueType * copy_impl( dart_handle_t handle; dash::internal::get_handle(src_gptr, dest_ptr, num_copy_elem, &handle); num_elem_copied += num_copy_elem; - handles.push_back(handle); + if (handle != DART_HANDLE_NULL) { + handles.push_back(handle); + } } } @@ -235,9 +239,11 @@ GlobOutputIt copy_impl( dash::internal::put_handle( out_first.dart_gptr(), in_first, - num_elements, + num_elements, &handle); - handles.push_back(handle); + if (handle != DART_HANDLE_NULL) { + handles.push_back(handle); + } auto out_last = out_first + num_elements; DASH_LOG_TRACE("dash::copy_impl >", @@ -450,6 +456,11 @@ dash::Future copy_async( out_last = out_first + total_copy_elem; } DASH_LOG_TRACE("dash::copy_async", "preparing future"); + if (handles->size() == 0) { + DASH_LOG_TRACE("dash::copy_async >", "finished (no pending handles), ", + "out_last:", out_last); + return dash::Future(out_last); + } dash::Future fut_result( // wait [=]() mutable { @@ -721,6 +732,10 @@ dash::Future copy_async( in_last, out_first, *handles); + + if (handles->size() == 0) { + return dash::Future(out_last); + } dash::Future fut_result( // get [=]() mutable { From 3f8cbde6a62289e079fe5a561c2b16e13ef0b9f9 Mon Sep 17 00:00:00 2001 From: Joseph Schuchart Date: Thu, 30 Nov 2017 10:21:40 +0900 Subject: [PATCH 11/11] Work-around MPICH's broken MPI_Testall --- dart-impl/mpi/src/dart_communication.c | 62 +++++++++++++++++++------- 1 file changed, 46 insertions(+), 16 deletions(-) diff --git a/dart-impl/mpi/src/dart_communication.c b/dart-impl/mpi/src/dart_communication.c index f69047c28..c62c40b24 100644 --- a/dart-impl/mpi/src/dart_communication.c +++ b/dart-impl/mpi/src/dart_communication.c @@ -172,8 +172,8 @@ dart__mpi__get( { if (reqs != NULL) { return MPI_Rget(origin_addr, origin_count, origin_datatype, - target_rank, target_disp, target_count, target_datatype, - win, &reqs[(*num_reqs)++]); + target_rank, target_disp, target_count, target_datatype, + win, &reqs[(*num_reqs)++]); } else { return MPI_Get(origin_addr, origin_count, origin_datatype, target_rank, target_disp, target_count, @@ -191,8 +191,8 @@ dart__mpi__put( { if (reqs != NULL) { return MPI_Rput(origin_addr, origin_count, origin_datatype, - target_rank, target_disp, target_count, target_datatype, - win, &reqs[(*num_reqs)++]); + target_rank, target_disp, target_count, target_datatype, + win, &reqs[(*num_reqs)++]); } else { return MPI_Put(origin_addr, origin_count, origin_datatype, target_rank, target_disp, target_count, @@ -609,7 +609,7 @@ dart_ret_t dart_accumulate( CHECK_UNITID_RANGE(team_unit_id, team_data); - DART_LOG_DEBUG("dart_accumulate() nelem:%zu dtype:%d op:%d unit:%d", + DART_LOG_DEBUG("dart_accumulate() nelem:%zu dtype:%ld op:%d unit:%d", nelem, dtype, op, team_unit_id.id); dart_segment_info_t *seginfo = dart_segment_get_info( @@ -703,7 +703,7 @@ dart_ret_t dart_fetch_and_op( CHECK_UNITID_RANGE(team_unit_id, team_data); - DART_LOG_DEBUG("dart_fetch_and_op() dtype:%d op:%d unit:%d " + DART_LOG_DEBUG("dart_fetch_and_op() dtype:%ld op:%d unit:%d " "offset:%"PRIu64" segid:%d", dtype, op, team_unit_id.id, gptr.addr_or_offs.offset, seg_id); @@ -755,7 +755,7 @@ dart_ret_t dart_compare_and_swap( CHECK_UNITID_RANGE(team_unit_id, team_data); - DART_LOG_TRACE("dart_compare_and_swap() dtype:%d unit:%d offset:%"PRIu64, + DART_LOG_TRACE("dart_compare_and_swap() dtype:%ld unit:%d offset:%"PRIu64, dtype, team_unit_id.id, gptr.addr_or_offs.offset); dart_segment_info_t *seginfo = dart_segment_get_info( @@ -1492,6 +1492,39 @@ dart_ret_t dart_waitall( return DART_OK; } +/** + * Wrapper around MPI_Testall to account for broken MPICH implementation. + * MPICH <= 3.2.1 and its derivatives seem to be affected + */ +inline static +int +dart__mpi__testall(int num_reqs, MPI_Request *reqs, int *flag_ptr) +{ +#if defined(MPICH_NUMVERSION) && MPICH_NUMVERSION <= 30201300 + int flag_result = 1; + for (int i = 0; i < num_reqs; ++i) { + int flag; + /* + * if the test succeeds the request is set to MPI_REQUEST_NULL, + * which can be safely passed to MPI_Test again. + * Eventually we will have all requests tested succesfully. + */ + int ret = MPI_Test(&reqs[i], &flag, MPI_STATUS_IGNORE); + if (ret != MPI_SUCCESS) { + return ret; + } + // one incomplete request will flip the flag to 0 + flag_result &= flag; + } + *flag_ptr = flag_result; + // we checked all requests succesfully + return MPI_SUCCESS; +#else + return MPI_Testall(num_reqs, reqs, + flag_ptr, MPI_STATUSES_IGNORE); +#endif //defined(MPICH_NUMVERSION) && MPICH_NUMVERSION <= 30201300 +} + dart_ret_t dart_test_local( dart_handle_t * handleptr, int32_t * is_finished) @@ -1509,8 +1542,7 @@ dart_ret_t dart_test_local( dart_handle_t handle = *handleptr; CHECK_MPI_RET( - MPI_Testall(handle->num_reqs, handle->reqs, - &flag, MPI_STATUSES_IGNORE), + dart__mpi__testall(handle->num_reqs, handle->reqs, &flag), "MPI_Testall"); if (flag) { @@ -1541,8 +1573,7 @@ dart_ret_t dart_test( dart_handle_t handle = *handleptr; CHECK_MPI_RET( - MPI_Testall(handle->num_reqs, handle->reqs, - &flag, MPI_STATUSES_IGNORE), + dart__mpi__testall(handle->num_reqs, handle->reqs, &flag), "MPI_Testall"); if (flag) { @@ -1590,8 +1621,7 @@ dart_ret_t dart_testall_local( } if (r_n) { - if (MPI_Testall(r_n, mpi_req, &flag, - MPI_STATUSES_IGNORE) != MPI_SUCCESS){ + if (dart__mpi__testall(r_n, mpi_req, &flag) != MPI_SUCCESS){ FREE_TMP(2 * n * sizeof(MPI_Request), mpi_req); DART_LOG_ERROR("dart_testall_local: MPI_Testall failed!"); return DART_ERR_OTHER; @@ -1641,10 +1671,10 @@ dart_ret_t dart_testall( } if (r_n) { - if (MPI_Testall(r_n, mpi_req, is_finished, - MPI_STATUSES_IGNORE) != MPI_SUCCESS){ + DART_LOG_TRACE(" MPI_Testall on %zu requests", r_n); + if (dart__mpi__testall(r_n, mpi_req, is_finished) != MPI_SUCCESS){ + DART_LOG_ERROR("dart_testall: MPI_Testall failed"); FREE_TMP(2 * n * sizeof(MPI_Request), mpi_req); - DART_LOG_ERROR("dart_testall_local: MPI_Testall failed!"); return DART_ERR_OTHER; }