Skip to content

Commit

Permalink
add snapshot while context , default configs updates
Browse files Browse the repository at this point in the history
  • Loading branch information
pratikmishra356 committed Apr 24, 2024
1 parent cbd5b6f commit 2ff22dc
Show file tree
Hide file tree
Showing 10 changed files with 147 additions and 62 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
-- This file should undo anything in `up.sql`
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
-- Your SQL goes here
-- Name: functions; Type: TABLE; Schema: public; Owner: -
--
CREATE TABLE public.config_verions (
id text PRIMARY KEY,
config text,
created_at timestamp without time zone DEFAULT CURRENT_TIMESTAMP NOT NULL
);
--
15 changes: 8 additions & 7 deletions crates/context_aware_config/src/api/config/handlers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,24 +98,25 @@ fn is_not_modified(max_created_at: Option<NaiveDateTime>, req: &HttpRequest) ->
max_created_at.is_some() && parsed_max <= last_modified
}

async fn generate_cac(
pub fn generate_cac(
conn: &mut PooledConnection<ConnectionManager<PgConnection>>,
) -> superposition::Result<Config> {
let contexts_vec = ctxt::contexts
.select((ctxt::id, ctxt::value, ctxt::override_id, ctxt::override_))
.select((ctxt::id, ctxt::value, ctxt::priority, ctxt::override_id, ctxt::override_))
.order_by((ctxt::priority.asc(), ctxt::created_at.asc()))
.load::<(String, Value, String, Value)>(conn)
.load::<(String, Value, i32, String, Value)>(conn)
.map_err(|err| {
log::error!("failed to fetch contexts with error: {}", err);
db_error!(err)
})?;

let (contexts, overrides) = contexts_vec.into_iter().fold(
(Vec::new(), Map::new()),
|(mut ctxts, mut overrides), (id, condition, override_id, override_)| {
|(mut ctxts, mut overrides), (id, condition, priority_ , override_id, override_)| {
let ctxt = super::types::Context {
id,
condition,
priority:priority_,
override_with_keys: [override_id.to_owned()],
};
ctxts.push(ctxt);
Expand Down Expand Up @@ -182,7 +183,7 @@ async fn get(
);
}

let mut config = generate_cac(&mut conn).await?;
let mut config = generate_cac(&mut conn)?;
if let Some(prefix) = query_params_map.get("prefix") {
let prefix_list: HashSet<&str> = prefix
.as_str()
Expand Down Expand Up @@ -240,7 +241,7 @@ async fn get_resolved_config(
return Ok(HttpResponse::NotModified().finish());
}

let res = generate_cac(&mut conn).await?;
let res = generate_cac(&mut conn)?;

let cac_client_contexts = res
.contexts
Expand Down Expand Up @@ -313,7 +314,7 @@ async fn get_filtered_config(
.map_or_else(|_| json!(value), |int_val| json!(int_val)),
);
}
let config = generate_cac(&mut conn).await?;
let config = generate_cac(&mut conn)?;
let contexts = config.contexts;

let filtered_context = filter_context(&contexts, &query_params_map)?;
Expand Down
2 changes: 1 addition & 1 deletion crates/context_aware_config/src/api/config/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
mod handlers;
pub mod handlers;
mod types;
pub use handlers::endpoints;
mod helpers;
1 change: 1 addition & 0 deletions crates/context_aware_config/src/api/config/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,5 +12,6 @@ pub struct Config {
pub struct Context {
pub id: String,
pub condition: Value,
pub priority: i32,
pub override_with_keys: [String; 1],
}
93 changes: 58 additions & 35 deletions crates/context_aware_config/src/api/context/handlers.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
extern crate base64;
use std::str;

use crate::helpers::{json_to_sorted_string, validate_context_jsonschema};
use crate::helpers::{json_to_sorted_string, validate_context_jsonschema, add_config_version};
use crate::{
api::{
context::types::{
Expand All @@ -18,6 +18,9 @@ use crate::{
},
},
};
use actix_web::web::Data;
use service_utils::service::types::AppState;

use actix_web::{
delete, get, put,
web::{Json, Path, Query},
Expand Down Expand Up @@ -276,16 +279,25 @@ fn put(

#[put("")]
async fn put_handler(
state: Data<AppState>,
req: Json<PutReq>,
mut db_conn: DbConnection,
user: User,
) -> superposition::Result<Json<PutResp>> {
put(req, &mut db_conn, false, &user)

let mut snowflake_generator = state.snowflake_generator.lock().unwrap();
let version_id = snowflake_generator.real_time_generate();
db_conn.transaction::<_, superposition::AppError, _>(|transaction_conn| {
let put_response = put(req, transaction_conn, false, &user)
.map(|resp| Json(resp))
.map_err(|err: superposition::AppError| {
log::info!("context put failed with error: {:?}", err);
err
})
})?;
add_config_version(version_id, transaction_conn)?;
Ok(put_response)

})
}

fn r#move(
Expand Down Expand Up @@ -334,23 +346,15 @@ fn r#move(
};

let handle_unique_violation =
|db_conn: &mut DBConnection, already_under_txn: bool| {
if already_under_txn {
let deleted_ctxt = diesel::delete(dsl::contexts)
.filter(dsl::id.eq(&old_ctx_id))
.get_result(db_conn)?;

let ctx = contruct_new_ctx_with_old_overrides(deleted_ctxt);
update_override_of_existing_ctx(db_conn, ctx)
} else {
db_conn.build_transaction().read_write().run(|conn| {
let deleted_ctxt = diesel::delete(dsl::contexts)
.filter(dsl::id.eq(&old_ctx_id))
.get_result(conn)?;
let ctx = contruct_new_ctx_with_old_overrides(deleted_ctxt);
update_override_of_existing_ctx(conn, ctx)
})
}
|db_conn: &mut DBConnection| {

let deleted_ctxt = diesel::delete(dsl::contexts)
.filter(dsl::id.eq(&old_ctx_id))
.get_result(db_conn)?;

let ctx = contruct_new_ctx_with_old_overrides(deleted_ctxt);
update_override_of_existing_ctx(db_conn, ctx)

};

match context {
Expand All @@ -359,7 +363,7 @@ fn r#move(
if already_under_txn {
diesel::sql_query("ROLLBACK TO update_ctx_savepoint").execute(conn)?;
}
handle_unique_violation(conn, already_under_txn)
handle_unique_violation(conn)
}
Err(e) => {
log::error!("failed to move context with db error: {:?}", e);
Expand All @@ -370,17 +374,25 @@ fn r#move(

#[put("/move/{ctx_id}")]
async fn move_handler(
state: Data<AppState>,
path: Path<String>,
req: Json<MoveReq>,
mut db_conn: DbConnection,
user: User,
) -> superposition::Result<Json<PutResp>> {
r#move(path.into_inner(), req, &mut db_conn, false, &user)
let mut snowflake_generator = state.snowflake_generator.lock().unwrap();
let version_id = snowflake_generator.real_time_generate();
db_conn.transaction::<_, superposition::AppError, _>(|transaction_conn| {
let move_reponse = r#move(path.into_inner(), req, transaction_conn, false, &user)
.map(|resp| Json(resp))
.map_err(|err| {
log::info!("move api failed with error: {:?}", err);
err
})
})?;
add_config_version(version_id, transaction_conn)?;
Ok(move_reponse)
})

}

#[get("/{ctx_id}")]
Expand Down Expand Up @@ -434,6 +446,7 @@ async fn list_contexts(

#[delete("/{ctx_id}")]
async fn delete_context(
state: Data<AppState>,
path: Path<String>,
db_conn: DbConnection,
user: User,
Expand All @@ -442,29 +455,38 @@ async fn delete_context(
let DbConnection(mut conn) = db_conn;

let ctx_id = path.into_inner();
let deleted_row =
delete(dsl::contexts.filter(dsl::id.eq(&ctx_id))).execute(&mut conn);
match deleted_row {
Ok(0) => Err(not_found!("Context Id `{}` doesn't exists", ctx_id)),
Ok(_) => {
log::info!("{ctx_id} context deleted by {}", user.get_email());
Ok(HttpResponse::NoContent().finish())
}
Err(e) => {
log::error!("context delete query failed with error: {e}");
Err(unexpected_error!("Something went wrong."))
let mut snowflake_generator = state.snowflake_generator.lock().unwrap();
let version_id = snowflake_generator.real_time_generate();
conn.transaction::<_, superposition::AppError, _>(|transaction_conn| {

let deleted_row =
delete(dsl::contexts.filter(dsl::id.eq(&ctx_id))).execute(transaction_conn);
match deleted_row {
Ok(0) => Err(not_found!("Context Id `{}` doesn't exists", ctx_id)),
Ok(_) => {
add_config_version(version_id, transaction_conn)?;
log::info!("{ctx_id} context deleted by {}", user.get_email());
Ok(HttpResponse::NoContent().finish())
}
Err(e) => {
log::error!("context delete query failed with error: {e}");
Err(unexpected_error!("Something went wrong."))
}
}
}
})
}

#[put("/bulk-operations")]
async fn bulk_operations(
state: Data<AppState>,
reqs: Json<Vec<ContextAction>>,
db_conn: DbConnection,
user: User,
) -> superposition::Result<Json<Vec<ContextBulkResponse>>> {
use contexts::dsl::contexts;
let DbConnection(mut conn) = db_conn;
let mut snowflake_generator = state.snowflake_generator.lock().unwrap();
let version_id = snowflake_generator.real_time_generate();

let mut response = Vec::<ContextBulkResponse>::new();
conn.transaction::<_, superposition::AppError, _>(|transaction_conn| {
Expand Down Expand Up @@ -519,6 +541,7 @@ async fn bulk_operations(
}
}
}
add_config_version(version_id, transaction_conn)?;
Ok(()) // Commit the transaction
})?;
Ok(Json(response))
Expand Down
44 changes: 26 additions & 18 deletions crates/context_aware_config/src/api/default_config/handlers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,9 @@ use crate::api::context::helpers::validate_value_with_function;
use crate::{
api::functions::helpers::get_published_function_code,
db::{self, models::DefaultConfig, schema::default_configs::dsl::default_configs},
helpers::validate_jsonschema,
helpers::{validate_jsonschema,add_config_version}
};
use diesel::Connection;
use actix_web::{
get, put,
web::{self, Data, Json},
Expand Down Expand Up @@ -139,25 +140,32 @@ async fn create(
)?;
}
}
let mut snowflake_generator = state.snowflake_generator.lock().unwrap();
let version_id = snowflake_generator.real_time_generate();
conn.transaction::<_, superposition::AppError, _>(|transaction_conn| {
let upsert = diesel::insert_into(default_configs)
.values(&default_config)
.on_conflict(db::schema::default_configs::key)
.do_update()
.set(&default_config)
.execute(transaction_conn);


add_config_version(version_id, transaction_conn)?;
match upsert {
Ok(_) => Ok(HttpResponse::Ok().json(json!({
"message": "DefaultConfig created/updated successfully."
}))),
Err(e) => {
log::info!("DefaultConfig creation failed with error: {e}");
Err(unexpected_error!(
"Something went wrong, failed to create DefaultConfig"
))
}

let upsert = diesel::insert_into(default_configs)
.values(&default_config)
.on_conflict(db::schema::default_configs::key)
.do_update()
.set(&default_config)
.execute(&mut conn);

match upsert {
Ok(_) => Ok(HttpResponse::Ok().json(json!({
"message": "DefaultConfig created/updated successfully."
}))),
Err(e) => {
log::info!("DefaultConfig creation failed with error: {e}");
Err(unexpected_error!(
"Something went wrong, failed to create DefaultConfig"
))
}
}

})
}

fn fetch_default_key(
Expand Down
11 changes: 10 additions & 1 deletion crates/context_aware_config/src/db/models.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::db::schema::{contexts, default_configs, dimensions, event_log, functions};
use crate::db::schema::{contexts, default_configs, dimensions, event_log, functions, config_versions};
use chrono::{offset::Utc, DateTime, NaiveDateTime};
use diesel::{AsChangeset, Insertable, Queryable, Selectable};
use serde::Serialize;
Expand Down Expand Up @@ -74,3 +74,12 @@ pub struct EventLog {
pub new_data: Option<Value>,
pub query: String,
}

#[derive(Queryable, Selectable, Insertable, AsChangeset, Serialize, Clone, Debug)]
#[diesel(check_for_backend(diesel::pg::Pg))]
#[diesel(primary_key(id))]
pub struct ConfigVersion {
pub id: String,
pub config: Value,
pub created_at: NaiveDateTime,
}
9 changes: 9 additions & 0 deletions crates/context_aware_config/src/db/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -601,6 +601,14 @@ diesel::table! {
diesel::joinable!(default_configs -> functions (function_name));
diesel::joinable!(dimensions -> functions (function_name));

diesel::table! {
config_versions (id) {
id -> Text,
config -> Json,
created_at -> Timestamp,
}
}

diesel::allow_tables_to_appear_in_same_query!(
contexts,
default_configs,
Expand Down Expand Up @@ -648,4 +656,5 @@ diesel::allow_tables_to_appear_in_same_query!(
event_log_y2026m11,
event_log_y2026m12,
functions,
config_versions,
);
Loading

0 comments on commit 2ff22dc

Please sign in to comment.