Skip to content

Commit

Permalink
Removes default resource and explicitly pass the data location (#18)
Browse files Browse the repository at this point in the history
* Fixed allocators

* Fix Device issues

* Fix device faiss initialization and destruction

* Fix memory stats issue
  • Loading branch information
koparasy authored Nov 6, 2023
1 parent 96570d8 commit 0afc1b4
Show file tree
Hide file tree
Showing 14 changed files with 428 additions and 513 deletions.
151 changes: 131 additions & 20 deletions examples/main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
#include <mfem.hpp>
#include <random>
#include <string>
#include <umpire/strategy/QuickPool.hpp>
#include <unordered_set>
#include <vector>

Expand All @@ -24,6 +25,94 @@

#include "AMS.h"

void printMemory(std::unordered_set<std::string> &allocators)
{
auto &rm = umpire::ResourceManager::getInstance();
for (auto AN : allocators) {
auto alloc = rm.getAllocator(AN);
size_t wm = alloc.getHighWatermark();
size_t cs = alloc.getCurrentSize();
size_t as = alloc.getActualSize();
std::cout << "Allocator '" << AN << "' High WaterMark:" << wm
<< " Current Size:" << cs << " Actual Size:" << as << "\n";
}
}


void createUmpirePool(std::string parent_name, std::string pool_name)
{
std::cout << "Pool Name " << pool_name << "Parent Allocation " << parent_name
<< "\n";
auto &rm = umpire::ResourceManager::getInstance();
auto alloc_resource = rm.makeAllocator<umpire::strategy::QuickPool, true>(
pool_name, rm.getAllocator(parent_name));
}

std::unordered_set<std::string> createMemoryAllocators(
std::string pool,
std::string &physics_host_alloc,
std::string &physics_device_alloc,
std::string &physics_pinned_alloc,
std::string &ams_host_alloc,
std::string &ams_device_alloc,
std::string &ams_pinned_alloc)
{
std::unordered_set<std::string> allocator_names;
if (pool == "default") {
physics_host_alloc = ams_host_alloc = "HOST";
allocator_names.insert(ams_host_alloc);
#ifdef __ENABLE_CUDA__
physics_device_alloc = ams_device_alloc = "DEVICE";
allocator_names.insert(ams_device_alloc);
physics_pinned_alloc = ams_pinned_alloc = "PINNED";
allocator_names.insert(ams_pinned_alloc);
#endif
} else if (pool == "split") {
physics_host_alloc = "phys-host";
createUmpirePool("HOST", "phys-host");
allocator_names.insert(physics_host_alloc);

ams_host_alloc = "ams-host";
createUmpirePool("HOST", ams_host_alloc);
allocator_names.insert(ams_host_alloc);

#ifdef __ENABLE_CUDA__
physics_device_alloc = "phys-device";
createUmpirePool("DEVICE", physics_device_alloc);
allocator_names.insert(physics_device_alloc);

physics_pinned_alloc = "phys-pinned";
createUmpirePool("PINNED", physics_pinned_alloc);
allocator_names.insert(physics_pinned_alloc);

ams_device_alloc = "ams-device";
createUmpirePool("DEVICE", ams_device_alloc);
allocator_names.insert(ams_device_alloc);

ams_pinned_alloc = "ams-pinned";
createUmpirePool("PINNED", ams_pinned_alloc);
allocator_names.insert(ams_pinned_alloc);
#endif
} else if (pool == "same") {
physics_host_alloc = ams_host_alloc = "common-host";
createUmpirePool("HOST", "common-host");
allocator_names.insert(physics_host_alloc);
#ifdef __ENABLE_CUDA__
physics_device_alloc = ams_device_alloc = "common-device";
createUmpirePool("DEVICE", "common-device");
allocator_names.insert(ams_device_alloc);
physics_pinned_alloc = ams_pinned_alloc = "common-pinned";
createUmpirePool("PINNED", "common-pinned");
allocator_names.insert(ams_pinned_alloc);
#endif
} else {
std::cout << "Stategy is " << pool << "\n";
throw std::runtime_error("Pool strategy does not exist\n");
}
return std::move(allocator_names);
}


using TypeValue = double;
using mfem::ForallWrap;

Expand Down Expand Up @@ -96,6 +185,7 @@ int main(int argc, char **argv)
TypeValue avg = 0.5;
TypeValue stdDev = 0.2;
bool reqDB = false;
const char *pool = "default";

#ifdef __ENABLE_DB__
reqDB = true;
Expand Down Expand Up @@ -203,6 +293,14 @@ int main(int argc, char **argv)
args.AddOption(
&verbose, "-v", "--verbose", "-qu", "--quiet", "Print extra stuff");

args.AddOption(&pool,
"-ptype",
"--pool-type",
"How to assign memory pools to AMSlib:\n"
"\t 'default' Use the default Umpire pool\n"
"\t 'split' provide a separate pool to AMSlib\n"
"\t 'same': assign the same with physics to AMS\n");

// -------------------------------------------------------------------------
// parse arguments
// -------------------------------------------------------------------------
Expand Down Expand Up @@ -305,31 +403,44 @@ int main(int argc, char **argv)

std::cerr << "Rank:" << rId << " Threshold " << threshold << "\n";

// -------------------------------------------------------------------------
// setup data allocators
// -------------------------------------------------------------------------
AMSSetupAllocator(AMSResourceType::HOST);
if (use_device) {
AMSSetupAllocator(AMSResourceType::DEVICE);
AMSSetupAllocator(AMSResourceType::PINNED);
AMSSetDefaultAllocator(AMSResourceType::DEVICE);
} else {
AMSSetDefaultAllocator(AMSResourceType::HOST);
}

// -------------------------------------------------------------------------
// setup mfem memory manager
// -------------------------------------------------------------------------
// hardcoded names!
const std::string &alloc_name_host(
AMSGetAllocatorName(AMSResourceType::HOST));
const std::string &alloc_name_device(
AMSGetAllocatorName(AMSResourceType::DEVICE));

mfem::MemoryManager::SetUmpireHostAllocatorName(alloc_name_host.c_str());
std::string physics_host_alloc;
std::string physics_device_alloc;
std::string physics_pinned_alloc;

std::string ams_host_alloc;
std::string ams_device_alloc;
std::string ams_pinned_alloc;

auto allocator_names = createMemoryAllocators(std::string(pool),
physics_host_alloc,
physics_device_alloc,
physics_pinned_alloc,
ams_host_alloc,
ams_device_alloc,
ams_pinned_alloc);


mfem::MemoryManager::SetUmpireHostAllocatorName(physics_host_alloc.c_str());
if (use_device) {
mfem::MemoryManager::SetUmpireDeviceAllocatorName(
alloc_name_device.c_str());
physics_device_alloc.c_str());
}


// When we are not allocating from parent/root umpire allocator
// we need to inform AMS about the pool allocators.
if (strcmp(pool, "default") != 0) {
AMSSetAllocator(AMSResourceType::HOST, ams_host_alloc.c_str());

if (use_device) {
AMSSetAllocator(AMSResourceType::DEVICE, ams_device_alloc.c_str());
AMSSetAllocator(AMSResourceType::PINNED, ams_pinned_alloc.c_str());
}
}

mfem::Device::SetMemoryTypes(mfem::MemoryType::HOST_UMPIRE,
Expand All @@ -340,7 +451,6 @@ int main(int argc, char **argv)
device.Print();
std::cout << std::endl;

//AMSResourceInfo();

// -------------------------------------------------------------------------
// setup indicators
Expand Down Expand Up @@ -399,7 +509,7 @@ int main(int argc, char **argv)
if (eos_name == std::string("ideal_gas")) {
eoses[mat_idx] = new IdealGas(1.6, 1.4);
} else if (eos_name == std::string("constant_host")) {
eoses[mat_idx] = new ConstantEOSOnHost(alloc_name_host.c_str(), 1.0);
eoses[mat_idx] = new ConstantEOSOnHost(physics_host_alloc.c_str(), 1.0);
} else {
std::cerr << "unknown eos `" << eos_name << "'" << std::endl;
return 1;
Expand Down Expand Up @@ -696,6 +806,7 @@ int main(int argc, char **argv)
}
CALIPER(CALI_MARK_END("Cycle");)
MPI_CALL(MPI_Barrier(MPI_COMM_WORLD));
printMemory(allocator_names);
}
#ifdef USE_AMS
delete[] workflow;
Expand Down
60 changes: 24 additions & 36 deletions src/AMS.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,20 +5,23 @@
* SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception
*/

#include "AMS.h"

#include <vector>

#include "AMS.h"
#include "wf/resource_manager.hpp"
#include "wf/workflow.hpp"

struct AMSWrap{
struct AMSWrap {
std::vector<std::pair<AMSDType, void *>> executors;
~AMSWrap() {
for ( auto E : executors ){
if ( E.second != nullptr ){
if ( E.first == AMSDType::Double ){
delete reinterpret_cast<ams::AMSWorkflow<double> *> (E.second);
} else{
delete reinterpret_cast<ams::AMSWorkflow<float> *> (E.second);
~AMSWrap()
{
for (auto E : executors) {
if (E.second != nullptr) {
if (E.first == AMSDType::Double) {
delete reinterpret_cast<ams::AMSWorkflow<double> *>(E.second);
} else {
delete reinterpret_cast<ams::AMSWorkflow<float> *>(E.second);
}
}
}
Expand All @@ -36,6 +39,9 @@ void _AMSExecute(AMSExecutor executor,
int outputDim,
MPI_Comm Comm = 0)
{
static std::once_flag flag;
std::call_once(flag, [&]() { ams::ResourceManager::init(); });

uint64_t index = reinterpret_cast<uint64_t>(executor);

if (index >= _amsWrap.executors.size())
Expand Down Expand Up @@ -81,15 +87,16 @@ AMSExecutor AMSCreateExecutor(const AMSConfig config)
config.SPath,
config.DBPath,
config.dbType,
config.device == AMSResourceType::HOST,
config.device,
config.threshold,
config.uqPolicy,
config.nClusters,
config.pId,
config.wSize,
config.ePolicy);

_amsWrap.executors.push_back(std::make_pair(config.dType, static_cast<void *>(dWF)));
_amsWrap.executors.push_back(
std::make_pair(config.dType, static_cast<void *>(dWF)));
return reinterpret_cast<AMSExecutor>(_amsWrap.executors.size() - 1L);
} else if (config.dType == AMSDType::Single) {
ams::AMSWorkflow<float> *sWF =
Expand All @@ -98,14 +105,15 @@ AMSExecutor AMSCreateExecutor(const AMSConfig config)
config.SPath,
config.DBPath,
config.dbType,
config.device == AMSResourceType::HOST,
config.device,
static_cast<float>(config.threshold),
config.uqPolicy,
config.nClusters,
config.pId,
config.wSize,
config.ePolicy);
_amsWrap.executors.push_back(std::make_pair(config.dType, static_cast<void *>(sWF)));
_amsWrap.executors.push_back(
std::make_pair(config.dType, static_cast<void *>(sWF)));

return reinterpret_cast<AMSExecutor>(_amsWrap.executors.size() - 1L);
} else {
Expand Down Expand Up @@ -155,32 +163,12 @@ void AMSDistributedExecute(AMSExecutor executor,

const char *AMSGetAllocatorName(AMSResourceType device)
{
if (device == AMSResourceType::HOST) {
return ams::ResourceManager::getHostAllocatorName();
} else if (device == AMSResourceType::DEVICE) {
return ams::ResourceManager::getDeviceAllocatorName();
}

throw std::runtime_error("requested Device Allocator does not exist");

return nullptr;
}

void AMSSetupAllocator(const AMSResourceType Resource)
{
ams::ResourceManager::setup(Resource);
}

void AMSResourceInfo() { ams::ResourceManager::list_allocators(); }

int AMSGetLocationId(void *ptr)
{
return ams::ResourceManager::getDataAllocationId(ptr);
return std::move(ams::ResourceManager::getAllocatorName(device)).c_str();
}

void AMSSetDefaultAllocator(const AMSResourceType device)
void AMSSetAllocator(AMSResourceType resource, const char *alloc_name)
{
ams::ResourceManager::setDefaultDataAllocator(device);
ams::ResourceManager::setAllocator(std::string(alloc_name), resource);
}

#ifdef __cplusplus
Expand Down
9 changes: 3 additions & 6 deletions src/include/AMS.h
Original file line number Diff line number Diff line change
Expand Up @@ -58,9 +58,9 @@ typedef enum { UBALANCED = 0, BALANCED } AMSExecPolicy;
typedef enum { None = 0, CSV, REDIS, HDF5, RMQ } AMSDBType;

typedef enum {
FAISSMean =0,
FAISSMean = 0,
FAISSMax,
DeltaUQ // Not supported
DeltaUQ // Not supported
} AMSUQPolicy;

typedef struct ams_conf {
Expand Down Expand Up @@ -105,11 +105,8 @@ void AMSDestroyExecutor(AMSExecutor executor);
int AMSSetCommunicator(MPI_Comm Comm);
#endif

void AMSSetAllocator(AMSResourceType resource, const char *alloc_name);
const char *AMSGetAllocatorName(AMSResourceType device);
void AMSSetupAllocator(const AMSResourceType device);
void AMSSetDefaultAllocator(const AMSResourceType device);
void AMSResourceInfo();
int AMSGetLocationId(void *ptr);

#ifdef __cplusplus
}
Expand Down
Loading

0 comments on commit 0afc1b4

Please sign in to comment.