From 31052cb3b8eaa8e400471c9655aa5477cb694f03 Mon Sep 17 00:00:00 2001 From: Pratik Mishra Date: Mon, 7 Oct 2024 12:03:35 +0530 Subject: [PATCH] feat: add cache in cac client --- Cargo.lock | 107 ++++++++++++++++++ Cargo.toml | 5 +- clients/go/cacclient/main.go | 24 +++- clients/go/expclient/main.go | 2 +- clients/go/main.go | 6 +- clients/haskell/hs-cac-client/src/Client.hs | 27 +++-- clients/haskell/hs-cac-client/src/Main.hs | 2 +- .../src/main/java/cac_client/CacClient.java | 10 +- .../src/main/java/example/Demo.java | 2 +- .../python/cac_client/cac_client/client.py | 16 ++- crates/cac_client/Cargo.toml | 2 + crates/cac_client/src/interface.rs | 24 +++- crates/cac_client/src/lib.rs | 97 +++++++++++++--- crates/cac_client/src/utils.rs | 86 ++++++++++++++ crates/context_aware_config/Cargo.toml | 6 +- .../src/api/context/handlers.rs | 4 +- crates/context_aware_config/src/helpers.rs | 80 +------------ .../src/main.rs | 3 + headers/libcac_client.h | 7 +- 19 files changed, 379 insertions(+), 131 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 757a5d13..c8ad8fe3 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -958,8 +958,10 @@ dependencies = [ "cbindgen", "chrono", "derive_more", + "itertools 0.10.5", "jsonlogic", "log", + "mini-moka", "once_cell", "reqwest", "serde_json", @@ -1025,6 +1027,31 @@ name = "camino" version = "1.1.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c59e92b5a388f549b863a7bea62612c09f24c8393560709a54558a9abdfb3b9c" +dependencies = [ + "serde", +] + +[[package]] +name = "cargo-platform" +version = "0.1.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "24b1f0365a6c6bb4020cd05806fd0d33c44d38046b8bd7f0e40814b9763cabfc" +dependencies = [ + "serde", +] + +[[package]] +name = "cargo_metadata" +version = "0.14.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4acbb09d9ee8e23699b9634375c72795d095bf268439da88562cf9b501f181fa" +dependencies = [ + "camino", + "cargo-platform", + "semver", + "serde", + "serde_json", +] [[package]] name = "cbindgen" @@ -1341,6 +1368,15 @@ dependencies = [ "cfg-if", ] +[[package]] +name = "crossbeam-channel" +version = "0.5.13" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "33480d6946193aa8033910124896ca395333cae7e2d1113d1fef6c3272217df2" +dependencies = [ + "crossbeam-utils", +] + [[package]] name = "crossbeam-queue" version = "0.3.11" @@ -1619,6 +1655,15 @@ dependencies = [ "libc", ] +[[package]] +name = "error-chain" +version = "0.12.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2d2f06b9cac1506ece98fe3231e3cc9c4410ec3d5b1f24ae1c8946f0742cdefc" +dependencies = [ + "version_check", +] + [[package]] name = "experimentation_client" version = "0.9.0" @@ -1933,6 +1978,12 @@ version = "0.27.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b6c80984affa11d98d1b88b66ac8853f143217b399d3c74116778ff8fdb4ed2e" +[[package]] +name = "glob" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d2fabcfbdc87f4758337ca535fb41a6d701b65693ce38287d856d1674551ec9b" + [[package]] name = "gloo-net" version = "0.5.0" @@ -2772,6 +2823,21 @@ dependencies = [ "unicase", ] +[[package]] +name = "mini-moka" +version = "0.10.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c325dfab65f261f386debee8b0969da215b3fa0037e74c8a1234db7ba986d803" +dependencies = [ + "crossbeam-channel", + "crossbeam-utils", + "dashmap", + "skeptic", + "smallvec", + "tagptr", + "triomphe", +] + [[package]] name = "minimal-lexical" version = "0.2.1" @@ -3233,6 +3299,17 @@ dependencies = [ "yansi", ] +[[package]] +name = "pulldown-cmark" +version = "0.9.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "57206b407293d2bcd3af849ce869d52068623f19e1b5ff8e8778e3309439682b" +dependencies = [ + "bitflags 2.3.1", + "memchr", + "unicase", +] + [[package]] name = "quote" version = "1.0.35" @@ -3633,6 +3710,9 @@ name = "semver" version = "1.0.17" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "bebd363326d05ec3e2f532ab7660680f3b02130d780c299bca73469d521bc0ed" +dependencies = [ + "serde", +] [[package]] name = "send_wrapper" @@ -3847,6 +3927,21 @@ dependencies = [ "libc", ] +[[package]] +name = "skeptic" +version = "0.13.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "16d23b015676c90a0f01c197bfdc786c20342c73a0afdda9025adb0bc42940a8" +dependencies = [ + "bytecount", + "cargo_metadata", + "error-chain", + "glob", + "pulldown-cmark", + "tempfile", + "walkdir", +] + [[package]] name = "slab" version = "0.4.8" @@ -4101,6 +4196,12 @@ dependencies = [ "syn 2.0.48", ] +[[package]] +name = "tagptr" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7b2093cf4c8eb1e67749a6762251bc9cd836b6fc171623bd0a9d324d37af2417" + [[package]] name = "tempfile" version = "3.6.0" @@ -4349,6 +4450,12 @@ dependencies = [ "once_cell", ] +[[package]] +name = "triomphe" +version = "0.1.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ef8f7726da4807b58ea5c96fdc122f80702030edc33b35aff9190a51148ccc85" + [[package]] name = "try-lock" version = "0.2.4" diff --git a/Cargo.toml b/Cargo.toml index eb549af3..baf29c5c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -33,6 +33,7 @@ assets-dir = "crates/frontend/assets" actix-web = "4.5.0" anyhow = "1.0.75" base64 = "0.21.2" +cfg-if = "1.0.0" chrono = { version = "0.4.26", features = ["serde"] } derive_more = "^0.99" diesel = { version = "2.1.0", features = [ @@ -43,6 +44,8 @@ diesel = { version = "2.1.0", features = [ "uuid", "postgres_backend", ] } +fred = { version = "9.2.1" } +itertools = { version = "0.10.5" } jsonlogic = { git = "https://github.com/juspay/jsonlogic_rs.git", version = "0.5.3" } jsonschema = "~0.17" leptos = { version = "0.6.11" } @@ -57,8 +60,6 @@ strum = "0.25" strum_macros = "0.25" toml = { version = "0.8.8", features = ["preserve_order"] } uuid = { version = "1.3.4", features = ["v4", "serde"] } -cfg-if = "1.0.0" -fred = { version = "9.2.1" } [workspace.lints.clippy] mod_module_files = "warn" diff --git a/clients/go/cacclient/main.go b/clients/go/cacclient/main.go index 56acb206..2bde1df7 100644 --- a/clients/go/cacclient/main.go +++ b/clients/go/cacclient/main.go @@ -1,7 +1,7 @@ package cacclient /* -#include "libcac_client.h" +#include "../../../headers/libcac_client.h" */ import "C" import ( @@ -28,15 +28,33 @@ type CacClient struct { } // NewCacClient creates a new CacClient -func NewCacClient(tenantName string, pollingFrequency int, cacHostName string) (*CacClient, error) { +func NewCacClient(tenantName string, pollingFrequency int, cacHostName string, cacheMaxCapacity *int, cacheTTL *int, cacheTTI *int) (*CacClient, error) { if tenantName == "" { return nil, errors.New("tenantName cannot be Empty") } if cacHostName == "" { return nil, errors.New("cacHostName cannot be Empty") } + var cacheCapacity *C.ulong + var cacheTimeToLive *C.ulong + var cacheTimeToIdle *C.ulong + if cacheMaxCapacity != nil { + cacheCapacity = (*C.ulong)(unsafe.Pointer(cacheMaxCapacity)) + } else { + cacheCapacity = nil + } + if cacheTTL != nil { + cacheTimeToLive = (*C.ulong)(unsafe.Pointer(cacheTTL)) + } else { + cacheTimeToLive = nil + } + if cacheTTI != nil { + cacheTimeToIdle = (*C.ulong)(unsafe.Pointer(cacheTTI)) + } else { + cacheTimeToIdle = nil + } client := &CacClient{tenant: tenantName, pollingFrequency: pollingFrequency, cacHostName: cacHostName, delimiter: ","} - resp := C.cac_new_client(C.CString(tenantName), C.ulong(pollingFrequency), C.CString(cacHostName)) + resp := C.cac_new_client(C.CString(tenantName), C.ulong(pollingFrequency), C.CString(cacHostName), cacheCapacity, cacheTimeToLive, cacheTimeToIdle) if resp == 1 { errorMessage := client.GetLastErrorMessage() fmt.Printf("Some Error Occur while creating new client: %s\n", errorMessage) diff --git a/clients/go/expclient/main.go b/clients/go/expclient/main.go index 3f11cddf..8fd8381c 100644 --- a/clients/go/expclient/main.go +++ b/clients/go/expclient/main.go @@ -1,7 +1,7 @@ package expclient /* -#include "libexperimentation_client.h" +#include "../../../headers/libexperimentation_client.h" */ import "C" import ( diff --git a/clients/go/main.go b/clients/go/main.go index f904cb29..f0ac7edd 100644 --- a/clients/go/main.go +++ b/clients/go/main.go @@ -12,18 +12,18 @@ func main() { pollingFrequency := 1 cachostName := "http://localhost:8080" - client, error := cacclient.NewCacClient(tenant, pollingFrequency, cachostName) + client, error := cacclient.NewCacClient(tenant, pollingFrequency, cachostName, nil, nil, nil) if error != nil { fmt.Println(error) } - fmt.Println("\n------------Configs----------------------------\n") + fmt.Println("\n------------Configs----------------------------") fmt.Println("Default Configs => ", client.GetConfig(nil, nil)) fmt.Println("Resolved Config => ", client.GetResolvedConfig(map[string]string{}, nil, cacclient.MERGE)) fmt.Println("Default Config => ", client.GetDefaultConfig(&[]string{})) - fmt.Println("\n------------Experiments----------------------------\n") + fmt.Println("\n------------Experiments----------------------------") expClient, error1 := expclient.NewExperimentationClient(tenant, pollingFrequency, cachostName) if error1 != nil { diff --git a/clients/haskell/hs-cac-client/src/Client.hs b/clients/haskell/hs-cac-client/src/Client.hs index f1350039..3c57484a 100644 --- a/clients/haskell/hs-cac-client/src/Client.hs +++ b/clients/haskell/hs-cac-client/src/Client.hs @@ -20,10 +20,11 @@ import Data.List (intercalate) import Foreign.C.String (CString, newCAString, peekCAString) import Foreign.C.Types (CInt (CInt), CULong (..)) import Foreign.ForeignPtr -import Foreign.Marshal.Alloc (free) +import Foreign.Marshal.Alloc (malloc, free) import Foreign.Marshal.Array (withArrayLen) import Foreign.Ptr import Prelude +import Foreign.Storable (poke) data Arc_Client @@ -35,7 +36,7 @@ type Tenant = String type Error = String foreign import ccall unsafe "cac_new_client" - c_new_cac_client :: CTenant -> CULong -> CString -> IO CInt + c_new_cac_client :: CTenant -> CULong -> CString -> Ptr CULong -> Ptr CULong -> Ptr CULong -> IO CInt foreign import ccall unsafe "&cac_free_client" c_free_cac_client :: FunPtr (Ptr CacClient -> IO ()) @@ -80,12 +81,22 @@ getError = c_last_error_message cleanup :: [Ptr a] -> IO () cleanup items = mapM free items $> () -createCacClient:: Tenant -> Integer -> String -> IO (Either Error ()) -createCacClient tenant frequency hostname = do +allocateCLongPtr :: Maybe Integer -> IO (Ptr CULong) +allocateCLongPtr Nothing = return nullPtr +allocateCLongPtr (Just val) = do + ptr <- malloc :: IO (Ptr CULong) -- Allocate memory for CULong + poke ptr (fromInteger val :: CULong) -- Store the value + return ptr -- Return the pointer + +createCacClient:: Tenant -> Integer -> String -> Maybe Integer -> Maybe Integer -> Maybe Integer -> IO (Either Error ()) +createCacClient tenant frequency hostname cacheMaxCapacity cacheTTL cacheTTI = do let duration = fromInteger frequency cTenant <- newCAString tenant cHostname <- newCAString hostname - resp <- c_new_cac_client cTenant duration cHostname + cacheCapacity <- allocateCLongPtr cacheMaxCapacity + cacheTimeToLive <- allocateCLongPtr cacheTTL + cacheTimeToIdle <- allocateCLongPtr cacheTTI + resp <- c_new_cac_client cTenant duration cHostname cacheCapacity cacheTimeToLive cacheTimeToIdle _ <- cleanup [cTenant, cHostname] case resp of 0 -> pure $ Right () @@ -108,7 +119,7 @@ getFullConfigStateWithFilter client mbFilters mbPrefix = do cPrefix <- case mbPrefix of Just prefix -> newCAString (intercalate "," prefix) Nothing -> return nullPtr - config <- withForeignPtr client $ \client -> c_get_config client cFilters cPrefix + config <- withForeignPtr client $ \val -> c_get_config val cFilters cPrefix _ <- cleanup [cFilters] if config == nullPtr then Left <$> getError @@ -132,7 +143,7 @@ getResolvedConfigWithStrategy client context mbKeys mergeStrat = do cStrKeys <- case mbKeys of Just keys -> newCAString (intercalate "|" keys) Nothing -> return nullPtr - overrides <- withForeignPtr client $ \client -> c_cac_get_resolved_config client cContext cStrKeys cMergeStrat + overrides <- withForeignPtr client $ \val -> c_cac_get_resolved_config val cContext cStrKeys cMergeStrat _ <- cleanup [cContext, cStrKeys] if overrides == nullPtr then Left <$> getError @@ -145,7 +156,7 @@ getDefaultConfig client mbKeys = do cStrKeys <- case mbKeys of Just keys -> newCAString (intercalate "|" keys) Nothing -> return nullPtr - overrides <- withForeignPtr client $ \client -> c_cac_get_default_config client cStrKeys + overrides <- withForeignPtr client $ \val -> c_cac_get_default_config val cStrKeys _ <- cleanup [cStrKeys] if overrides == nullPtr then Left <$> getError diff --git a/clients/haskell/hs-cac-client/src/Main.hs b/clients/haskell/hs-cac-client/src/Main.hs index 0e9cd698..a72fbdb6 100644 --- a/clients/haskell/hs-cac-client/src/Main.hs +++ b/clients/haskell/hs-cac-client/src/Main.hs @@ -11,7 +11,7 @@ import Prelude main :: IO () main = do - createCacClient "dev" 10 "http://localhost:8080" >>= \case + createCacClient "dev" 10 "http://localhost:8080" Nothing Nothing Nothing>>= \case Left err -> putStrLn err Right _ -> pure () threadId <- forkOS (cacStartPolling "dev") diff --git a/clients/java/cac-client/src/main/java/cac_client/CacClient.java b/clients/java/cac-client/src/main/java/cac_client/CacClient.java index 3a8ac76e..16115ba8 100644 --- a/clients/java/cac-client/src/main/java/cac_client/CacClient.java +++ b/clients/java/cac-client/src/main/java/cac_client/CacClient.java @@ -10,7 +10,7 @@ public class CacClient { public interface RustLib { String cac_last_error_message(); - int cac_new_client(String tenant, long updateFrequency, String hostName); + int cac_new_client(String tenant, long updateFrequency, String hostName, Pointer cacheMaxCapacity, Pointer cacheTTL, Pointer cacheTTI); void cac_free_client(Pointer ptr); @@ -39,8 +39,12 @@ public CacClient() { CacClient.rustLib = LibraryLoader.create(RustLib.class).load(libraryName); } - public int cacNewClient(String tenant, long updateFrequency, String hostName) throws CACClientException { - int result = rustLib.cac_new_client(tenant, updateFrequency, hostName); + public int cacNewClient(String tenant, long updateFrequency, String hostName, Long cacheMaxCapacity, Long cacheTTL, Long cacheTTI) throws CACClientException { + Pointer cacheMaxCapacityPointer = cacheMaxCapacity != null ? new Pointer(cacheMaxCapacity) : null; + Pointer cacheTTLPointer = cacheTTL != null ? new Pointer(cacheTTL) : null; + Pointer cacheTTIPointer = cacheTTI != null ? new Pointer(cacheTTI) : null; + + int result = rustLib.cac_new_client(tenant, updateFrequency, hostName, cacheMaxCapacityPointer, cacheTTLPointer, cacheTTIPointer); if (result > 0) { String errorMessage = rustLib.cac_last_error_message(); throw new CACClientException("Failed to create new CAC client: " + errorMessage); diff --git a/clients/java/cac-client/src/main/java/example/Demo.java b/clients/java/cac-client/src/main/java/example/Demo.java index 16c26783..9c7c4375 100644 --- a/clients/java/cac-client/src/main/java/example/Demo.java +++ b/clients/java/cac-client/src/main/java/example/Demo.java @@ -18,7 +18,7 @@ private static void callCacClient() { int newClient; try { - newClient = wrapper.cacNewClient(tenant, 1, "http://localhost:8080"); + newClient = wrapper.cacNewClient(tenant, 1, "http://localhost:8080", null, null, null); System.out.println("New client created successfully. Client ID: " + newClient); } catch (cac_client.CACClientException e) { System.err.println(e.getMessage()); diff --git a/clients/python/cac_client/cac_client/client.py b/clients/python/cac_client/cac_client/client.py index 69f1e3e2..bc184228 100644 --- a/clients/python/cac_client/cac_client/client.py +++ b/clients/python/cac_client/cac_client/client.py @@ -35,7 +35,7 @@ def __init__(self, config_dict): class CacClient: rust_lib = ctypes.CDLL(lib_path) - rust_lib.cac_new_client.argtypes = [ctypes.c_char_p, ctypes.c_int, ctypes.c_char_p] + rust_lib.cac_new_client.argtypes = [ctypes.c_char_p, ctypes.c_int, ctypes.c_char_p, ctypes.POINTER(ctypes.c_int), ctypes.POINTER(ctypes.c_int), ctypes.POINTER(ctypes.c_int)] rust_lib.cac_new_client.restype = ctypes.c_int rust_lib.cac_get_client.argtypes = [ctypes.c_char_p] @@ -66,16 +66,24 @@ class CacClient: rust_lib.cac_get_default_config.argtypes = [ctypes.c_char_p, ctypes.c_char_p] rust_lib.cac_get_default_config.restype = ctypes.c_char_p - def __init__(self, tenant_name: str, polling_frequency: int, cac_host_name: str): + def __init__(self, tenant_name: str, polling_frequency: int, cac_host_name: str, cache_max_capacity: int| None = None, cache_ttl: int| None = None, cache_tti: int| None = None): if not tenant_name or not cac_host_name: raise ValueError("tenantName cannot be null or empty") self.tenant = tenant_name self.polling_frequency = polling_frequency self.cac_host_name = cac_host_name + self.cache_max_capacity = cache_max_capacity + self.cache_ttl = cache_ttl + self.cache_tti = cache_tti + + max_capacity_ptr = ctypes.pointer(ctypes.c_int(cache_max_capacity)) if cache_max_capacity is not None else ctypes.POINTER(ctypes.c_int)() + ttl_ptr = ctypes.pointer(ctypes.c_int(cache_ttl)) if cache_ttl is not None else ctypes.POINTER(ctypes.c_int)() + tti_ptr = ctypes.pointer(ctypes.c_int(cache_tti)) if cache_tti is not None else ctypes.POINTER(ctypes.c_int)() + resp = self.rust_lib.cac_new_client( - self.tenant.encode(), self.polling_frequency, self.cac_host_name.encode()) + self.tenant.encode(), self.polling_frequency, self.cac_host_name.encode(), max_capacity_ptr, ttl_ptr, tti_ptr) if resp == 1: error_message = self.get_cac_last_error_message() raise Exception("Error Occured while creating new client ", error_message) @@ -101,8 +109,6 @@ def get_cac_config(self, filter_query: str | None = None, filter_prefix: str | N filter_query_ptr = None if filter_query is None else filter_query.encode() try: result = self.rust_lib.cac_get_config(client_ptr, filter_query_ptr, filter_prefix_ptr).decode() - print("pppp", result) - print(ast.literal_eval(result)) return Config(ast.literal_eval(result)) except: raise Exception(self.rust_lib.get_cac_last_error_message()) diff --git a/crates/cac_client/Cargo.toml b/crates/cac_client/Cargo.toml index aef18e37..75bc0e45 100644 --- a/crates/cac_client/Cargo.toml +++ b/crates/cac_client/Cargo.toml @@ -10,8 +10,10 @@ build = "build.rs" actix-web = { workspace = true } chrono = { workspace = true } derive_more = { workspace = true } +itertools = { workspace = true } jsonlogic = { workspace = true } log = { workspace = true } +mini-moka = { version = "0.10.3" } once_cell = { workspace = true } reqwest = { workspace = true } serde_json = { workspace = true } diff --git a/crates/cac_client/src/interface.rs b/crates/cac_client/src/interface.rs index 4615c2f5..6dce6632 100644 --- a/crates/cac_client/src/interface.rs +++ b/crates/cac_client/src/interface.rs @@ -52,6 +52,14 @@ fn cstring_to_rstring(s: *const c_char) -> Result { s.to_str().map(str::to_string).map_err_to_string() } +fn ptr_option_ulong_to_option_u64(v: *const c_ulong) -> Option { + if !v.is_null() { + Some(unsafe { *v as u64 }) + } else { + None + } +} + fn rstring_to_cstring(s: String) -> CString { CString::new(s.as_str()).unwrap_or_default() } @@ -102,15 +110,27 @@ pub extern "C" fn cac_new_client( tenant: *const c_char, update_frequency: c_ulong, hostname: *const c_char, + cache_max_capacity: *const c_ulong, + cache_ttl: *const c_ulong, + cache_tti: *const c_ulong, ) -> c_int { let duration = Duration::new(update_frequency, 0); let tenant = unwrap_safe!(cstring_to_rstring(tenant), return 1); let hostname = unwrap_safe!(cstring_to_rstring(hostname), return 1); - + let cache_max_capacity = ptr_option_ulong_to_option_u64(cache_max_capacity); + let cache_ttl = ptr_option_ulong_to_option_u64(cache_ttl); + let cache_tti = ptr_option_ulong_to_option_u64(cache_tti); // println!("Creating cac client thread for tenant {tenant}"); CAC_RUNTIME.block_on(async move { match CLIENT_FACTORY - .create_client(tenant.clone(), duration, hostname) + .create_client( + tenant.clone(), + duration, + hostname, + cache_max_capacity, + cache_ttl, + cache_tti, + ) .await { Ok(_) => 0, diff --git a/crates/cac_client/src/lib.rs b/crates/cac_client/src/lib.rs index a68d18bf..2a8e64ea 100644 --- a/crates/cac_client/src/lib.rs +++ b/crates/cac_client/src/lib.rs @@ -1,7 +1,7 @@ #![deny(unused_crate_dependencies)] mod eval; mod interface; -mod utils; +pub mod utils; use std::{ collections::{HashMap, HashSet}, @@ -13,13 +13,19 @@ use std::{ use actix_web::{rt::time::interval, web::Data}; use chrono::{DateTime, Utc}; use derive_more::{Deref, DerefMut}; +use mini_moka::sync::Cache; use reqwest::{RequestBuilder, Response, StatusCode}; use serde_json::{Map, Value}; +use strum_macros::Display; use superposition_types::{Config, Context}; use tokio::sync::RwLock; -use utils::core::MapError; +use utils::{core::MapError, json_to_sorted_string}; -#[derive(strum_macros::EnumString)] +static CACHE_MAX_CAPACITY: u64 = 10 * 1024 * 1024; //in mb +static CACHE_TTL: u64 = 180 * 60; //in minutes +static CACHE_TTI: u64 = 30 * 60; //in minutes + +#[derive(strum_macros::EnumString, Clone, Display)] #[strum(serialize_all = "snake_case")] pub enum MergeStrategy { MERGE, @@ -50,6 +56,7 @@ pub struct Client { polling_interval: Duration, last_modified: Data>>, config: Data>, + config_cache: Cache>, } fn clone_reqw(reqw: &RequestBuilder) -> Result { @@ -70,10 +77,17 @@ fn get_last_modified(resp: &Response) -> Option> { } impl Client { + /** cache_max_capacity: Max size of cache in mb, default 10 mb + * cache_ttl: Time to live value in minutes, default 180 minutes + * cache_tti: Time to idle value in minutes, default 30 minutes + */ pub async fn new( tenant: String, polling_interval: Duration, hostname: String, + cache_max_capacity: Option, + cache_ttl: Option, + cache_tti: Option, ) -> Result { let reqw_client = reqwest::Client::builder().build().map_err_to_string()?; let cac_endpoint = format!("{hostname}/config"); @@ -88,7 +102,24 @@ impl Client { return Err("Invalid tenant".to_string()); } let config = resp.json::().await.map_err_to_string()?; - + let config_cache = Cache::builder() + .weigher(|_key, value: &Map| -> u32 { + Value::Object(value.to_owned()) + .to_string() + .len() + .try_into() + .unwrap_or(u32::MAX) + }) + // max size of cache in mb + .max_capacity( + cache_max_capacity.map_or(CACHE_MAX_CAPACITY, |v| v * 1024 * 1024), + ) + // Time to live (TTL): in minutes + .time_to_live(Duration::from_secs(cache_ttl.map_or(CACHE_TTL, |v| v * 60))) + // Time to idle (TTI): in minutes + .time_to_idle(Duration::from_secs(cache_tti.map_or(CACHE_TTI, |v| v * 60))) + // Create the cache. + .build(); let client = Client { tenant, reqw: Data::new(reqw), @@ -97,6 +128,7 @@ impl Client { last_modified_at.unwrap_or(DateTime::::from(UNIX_EPOCH)), )), config: Data::new(RwLock::new(config)), + config_cache, }; Ok(client) } @@ -175,18 +207,37 @@ impl Client { filter_keys: Option>, merge_strategy: MergeStrategy, ) -> Result, String> { - let cac = self.config.read().await; - let mut config = cac.to_owned(); - if let Some(keys) = filter_keys { - config = config.filter_by_prefix(&HashSet::from_iter(keys)); + let filter_string = if let Some(vec) = filter_keys.clone() { + let hset: HashSet = HashSet::from_iter(vec); + let mut vec: Vec = hset.iter().cloned().collect(); + vec.sort(); + vec.join(",") + } else { + "null".to_string() + }; + let hash_key = json_to_sorted_string(&Value::Object(query_data.clone())) + + "?" + + &merge_strategy.clone().to_string() + + "?" + + &filter_string; + if let Some(value) = self.config_cache.get(&hash_key) { + Ok(value) + } else { + let cac = self.config.read().await; + let mut config = cac.to_owned(); + if let Some(keys) = filter_keys { + config = config.filter_by_prefix(&HashSet::from_iter(keys)); + } + let evaled_cac = eval::eval_cac( + config.default_configs.to_owned(), + &config.contexts, + &config.overrides, + &query_data, + merge_strategy, + )?; + self.config_cache.insert(hash_key, evaled_cac.clone()); + Ok(evaled_cac) } - eval::eval_cac( - config.default_configs.to_owned(), - &config.contexts, - &config.overrides, - &query_data, - merge_strategy, - ) } pub async fn get_default_config( @@ -210,6 +261,9 @@ impl ClientFactory { tenant: String, polling_interval: Duration, hostname: String, + cache_max_capacity: Option, + cache_ttl: Option, + cache_tti: Option, ) -> Result, String> { let mut factory = self.write().await; @@ -217,8 +271,17 @@ impl ClientFactory { return Ok(client.clone()); } - let client = - Arc::new(Client::new(tenant.to_string(), polling_interval, hostname).await?); + let client = Arc::new( + Client::new( + tenant.to_string(), + polling_interval, + hostname, + cache_max_capacity, + cache_ttl, + cache_tti, + ) + .await?, + ); factory.insert(tenant.to_string(), client.clone()); Ok(client.clone()) } diff --git a/crates/cac_client/src/utils.rs b/crates/cac_client/src/utils.rs index 5a7ca06a..3fdae3be 100644 --- a/crates/cac_client/src/utils.rs +++ b/crates/cac_client/src/utils.rs @@ -1 +1,87 @@ +use itertools::{self, Itertools}; +use serde_json::Value; pub mod core; + +pub fn json_to_sorted_string(v: &Value) -> String { + match v { + Value::Object(m) => { + let mut new_str: String = String::from(""); + for (i, val) in m.iter().sorted_by_key(|item| item.0) { + let p: String = json_to_sorted_string(val); + new_str.push_str(i); + new_str.push_str(&String::from(":")); + new_str.push_str(&p); + new_str.push_str(&String::from("$")); + } + new_str + } + Value::String(m) => m.to_string(), + Value::Number(m) => m.to_string(), + Value::Bool(m) => m.to_string(), + Value::Null => String::from("null"), + Value::Array(m) => { + let mut new_vec = + m.iter().map(json_to_sorted_string).collect::>(); + new_vec.sort(); + new_vec.join(",") + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use serde_json::json; + + #[test] + fn test_json_to_sorted_string() { + let first_condition: Value = json!({ + "and": [ + { + "==": [ + { + "var": "os" + }, + "android" + ] + }, + { + "==": [ + { + "var": "clientId" + }, + "geddit" + ] + } + ] + }); + + let second_condition: Value = json!({ + "and": [ + { + "==": [ + { + "var": "clientId" + }, + "geddit" + ] + }, + { + "==": [ + { + "var": "os" + }, + "android" + ] + } + ] + }); + let expected_string: String = + "and:==:android,var:os$$,==:geddit,var:clientId$$$".to_owned(); + assert_eq!(json_to_sorted_string(&first_condition), expected_string); + assert_eq!( + json_to_sorted_string(&first_condition), + json_to_sorted_string(&second_condition) + ); + } +} diff --git a/crates/context_aware_config/Cargo.toml b/crates/context_aware_config/Cargo.toml index 1d504171..20ddfb77 100644 --- a/crates/context_aware_config/Cargo.toml +++ b/crates/context_aware_config/Cargo.toml @@ -13,11 +13,13 @@ anyhow = { workspace = true } base64 = { workspace = true } blake3 = "1.3.3" cac_client = { path = "../cac_client" } +cfg-if = { workspace = true } chrono = { workspace = true } derive_more = { workspace = true } diesel = { workspace = true } +fred = { workspace = true, optional = true, features = ["metrics"]} futures-util = "0.3.28" -itertools = "0.10.5" +itertools = { workspace = true } jsonschema = { workspace = true } log = { workspace = true } serde = { workspace = true } @@ -31,8 +33,6 @@ superposition_types = { path = "../superposition_types", features = [ "server", ] } uuid = { workspace = true } -fred = { workspace = true, optional = true, features = ["metrics"]} -cfg-if = { workspace = true } [features] disable_db_data_validation = ["superposition_types/disable_db_data_validation"] diff --git a/crates/context_aware_config/src/api/context/handlers.rs b/crates/context_aware_config/src/api/context/handlers.rs index cae26bfc..246e3437 100644 --- a/crates/context_aware_config/src/api/context/handlers.rs +++ b/crates/context_aware_config/src/api/context/handlers.rs @@ -9,6 +9,7 @@ use actix_web::{ web::{Data, Json, Path}, HttpResponse, Scope, }; +use cac_client::utils::json_to_sorted_string; use chrono::Utc; use diesel::{ delete, @@ -53,8 +54,7 @@ use crate::{ }, }, helpers::{ - add_config_version, calculate_context_priority, json_to_sorted_string, - validate_context_jsonschema, + add_config_version, calculate_context_priority, validate_context_jsonschema, }, }; diff --git a/crates/context_aware_config/src/helpers.rs b/crates/context_aware_config/src/helpers.rs index b7164280..ac2ad2dc 100644 --- a/crates/context_aware_config/src/helpers.rs +++ b/crates/context_aware_config/src/helpers.rs @@ -11,7 +11,7 @@ use diesel::{ }; #[cfg(feature = "high-performance-mode")] use fred::interfaces::KeysInterface; -use itertools::{self, Itertools}; + use jsonschema::{Draft, JSONSchema, ValidationError}; use serde_json::{json, Map, Value}; #[cfg(feature = "high-performance-mode")] @@ -164,32 +164,6 @@ pub fn validate_jsonschema( res } -pub fn json_to_sorted_string(v: &Value) -> String { - match v { - Value::Object(m) => { - let mut new_str: String = String::from(""); - for (i, val) in m.iter().sorted_by_key(|item| item.0) { - let p: String = json_to_sorted_string(val); - new_str.push_str(i); - new_str.push_str(&String::from(":")); - new_str.push_str(&p); - new_str.push_str(&String::from("$")); - } - new_str - } - Value::String(m) => m.to_string(), - Value::Number(m) => m.to_string(), - Value::Bool(m) => m.to_string(), - Value::Null => String::from("null"), - Value::Array(m) => { - let mut new_vec = - m.iter().map(json_to_sorted_string).collect::>(); - new_vec.sort(); - new_vec.join(",") - } - } -} - pub fn calculate_context_priority( _object_key: &str, cond: &Value, @@ -385,58 +359,6 @@ mod tests { assert!(ok_enum_validation.is_ok()); } - #[test] - fn test_json_to_sorted_string() { - let first_condition: Value = json!({ - "and": [ - { - "==": [ - { - "var": "os" - }, - "android" - ] - }, - { - "==": [ - { - "var": "clientId" - }, - "geddit" - ] - } - ] - }); - - let second_condition: Value = json!({ - "and": [ - { - "==": [ - { - "var": "clientId" - }, - "geddit" - ] - }, - { - "==": [ - { - "var": "os" - }, - "android" - ] - } - ] - }); - let expected_string: String = - "and:==:android,var:os$$,==:geddit,var:clientId$$$".to_owned(); - assert_eq!(json_to_sorted_string(&first_condition), expected_string); - assert_eq!( - json_to_sorted_string(&first_condition), - json_to_sorted_string(&second_condition) - ); - } - #[test] fn test_validate_context_jsonschema() { let test_schema = json!({ diff --git a/examples/cac_client_integration_example/src/main.rs b/examples/cac_client_integration_example/src/main.rs index 66931fc8..cee10911 100644 --- a/examples/cac_client_integration_example/src/main.rs +++ b/examples/cac_client_integration_example/src/main.rs @@ -17,6 +17,9 @@ async fn main() -> std::io::Result<()> { "dev".to_string(), Duration::new(10, 0), "http://localhost:8080".into(), + None, + None, + None, ) .await .expect(format!("{}: Failed to acquire cac_client", "dev").as_str()) diff --git a/headers/libcac_client.h b/headers/libcac_client.h index 7b562bd1..d5f0db25 100644 --- a/headers/libcac_client.h +++ b/headers/libcac_client.h @@ -11,7 +11,12 @@ const char *cac_last_error_message(void); void cac_free_string(char *s); -int cac_new_client(const char *tenant, unsigned long update_frequency, const char *hostname); +int cac_new_client(const char *tenant, + unsigned long update_frequency, + const char *hostname, + const unsigned long *cache_max_capacity, + const unsigned long *cache_ttl, + const unsigned long *cache_tti); void cac_start_polling_update(const char *tenant);