Skip to content

Commit

Permalink
feat: Webhook trigger for experiments
Browse files Browse the repository at this point in the history
  • Loading branch information
Ankit Mahato authored and Ankit Mahato committed Oct 17, 2024
1 parent 8ca8135 commit de69732
Show file tree
Hide file tree
Showing 7 changed files with 344 additions and 56 deletions.
135 changes: 86 additions & 49 deletions crates/experimentation_platform/src/api/experiments/handlers.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,6 @@
use std::{
collections::{HashMap, HashSet},
str::FromStr,
};
use std::collections::{HashMap, HashSet};

use actix_http::header::{self, HeaderMap, HeaderName, HeaderValue};
use actix_http::header::{self};
use actix_web::{
get, patch, post, put,
web::{self, Data, Json, Query},
Expand All @@ -15,20 +12,27 @@ use diesel::{
r2d2::{ConnectionManager, PooledConnection},
ExpressionMethods, PgConnection, QueryDsl, RunQueryDsl,
};

use service_utils::helpers::{
construct_request_headers, execute_webhook_call, generate_snowflake_id, request,
};

use reqwest::{Method, Response, StatusCode};
use serde_json::{json, Map, Value};
use service_utils::helpers::{construct_request_headers, generate_snowflake_id, request};
use service_utils::service::types::{
AppHeader, AppState, CustomHeaders, DbConnection, Tenant,
};
use superposition_macros::{bad_argument, response_error, unexpected_error};
use superposition_types::{result as superposition, Condition, Exp, Overrides, User};
use superposition_types::{
result::{self as superposition},
Condition, Exp, Overrides, TenantConfig, User, WebhookEvent,
};

use super::{
helpers::{
add_variant_dimension_to_ctx, check_variant_types,
check_variants_override_coverage, decide_variant, extract_override_keys,
validate_experiment, validate_override_keys,
fetch_cac_config, validate_experiment, validate_override_keys,
},
types::{
ApplicableVariantsQuery, AuditQueryFilters, ConcludeExperimentRequest,
Expand All @@ -38,9 +42,12 @@ use super::{
},
};

use crate::db::{
models::{EventLog, Experiment, ExperimentStatusType, Variant, Variants},
schema::{event_log::dsl as event_log, experiments::dsl as experiments},
use crate::{
api::experiments::helpers::construct_header_map,
db::{
models::{EventLog, Experiment, ExperimentStatusType, Variant, Variants},
schema::{event_log::dsl as event_log, experiments::dsl as experiments},
},
};

pub fn endpoints(scope: Scope) -> Scope {
Expand All @@ -55,33 +62,6 @@ pub fn endpoints(scope: Scope) -> Scope {
.service(update_overrides)
}

fn construct_header_map(
tenant: &str,
other_headers: Vec<(&str, String)>,
) -> superposition::Result<HeaderMap> {
let mut headers = HeaderMap::new();
let tenant_val = HeaderValue::from_str(tenant).map_err(|err| {
log::error!("failed to set header: {}", err);
unexpected_error!("Something went wrong")
})?;
headers.insert(HeaderName::from_static("x-tenant"), tenant_val);
for (header, value) in other_headers {
let header_name = HeaderName::from_str(header).map_err(|err| {
log::error!("failed to set header: {}", err);
unexpected_error!("Something went wrong")
})?;

HeaderValue::from_str(value.as_str())
.map(|header_val| headers.insert(header_name, header_val))
.map_err(|err| {
log::error!("failed to set header: {}", err);
unexpected_error!("Something went wrong")
})?;
}

Ok(headers)
}

fn add_config_version_to_header(
config_version: &Option<String>,
resp_builder: &mut HttpResponseBuilder,
Expand Down Expand Up @@ -151,6 +131,7 @@ async fn create(
db_conn: DbConnection,
tenant: Tenant,
user: User,
tenant_config: TenantConfig,
) -> superposition::Result<HttpResponse> {
use crate::db::schema::experiments::dsl::experiments;
let mut variants = req.variants.to_vec();
Expand Down Expand Up @@ -305,11 +286,23 @@ async fn create(
.get_results(&mut conn)?;

let inserted_experiment: Experiment = inserted_experiments.remove(0);
let response = ExperimentCreateResponse::from(inserted_experiment);
let response = ExperimentCreateResponse::from(inserted_experiment.clone());

let mut http_resp = HttpResponse::Ok();
add_config_version_to_header(&config_version_id, &mut http_resp);
Ok(http_resp.json(response))
match execute_webhook_call(
tenant_config.experiments_webhook_config,
inserted_experiment,
&config_version_id,
WebhookEvent::ExperimentCreated,
)
.await
{
Ok(_) => {
let mut http_resp = HttpResponse::Ok();
add_config_version_to_header(&config_version_id, &mut http_resp);
Ok(http_resp.json(response))
}
Err(webhook_error) => Err(webhook_error),
}
}

#[patch("/{experiment_id}/conclude")]
Expand All @@ -320,6 +313,7 @@ async fn conclude_handler(
req: web::Json<ConcludeExperimentRequest>,
db_conn: DbConnection,
tenant: Tenant,
tenant_config: TenantConfig,
user: User,
) -> superposition::Result<HttpResponse> {
let DbConnection(conn) = db_conn;
Expand All @@ -333,9 +327,24 @@ async fn conclude_handler(
user,
)
.await?;
let mut http_resp = HttpResponse::Ok();
add_config_version_to_header(&config_version_id, &mut http_resp);
Ok(http_resp.json(ExperimentResponse::from(response)))

match execute_webhook_call(
tenant_config.experiments_webhook_config,
response.clone(),
&config_version_id,
WebhookEvent::ExperimentConcluded,
)
.await
{
Ok(_) => {
let mut http_resp = HttpResponse::Ok();
add_config_version_to_header(&config_version_id, &mut http_resp);
Ok(http_resp.json(ExperimentResponse::from(response)))
}
Err(webhook_error) => Err(superposition::AppError::WebhookError(anyhow!(
webhook_error
))),
}
}

pub async fn conclude(
Expand Down Expand Up @@ -608,10 +617,13 @@ pub fn get_experiment(

#[patch("/{id}/ramp")]
async fn ramp(
data: Data<AppState>,
params: web::Path<i64>,
req: web::Json<RampRequest>,
db_conn: DbConnection,
user: User,
tenant: Tenant,
tenant_config: TenantConfig,
) -> superposition::Result<Json<ExperimentResponse>> {
let DbConnection(mut conn) = db_conn;
let exp_id = params.into_inner();
Expand Down Expand Up @@ -648,7 +660,19 @@ async fn ramp(
))
.get_result(&mut conn)?;

Ok(Json(ExperimentResponse::from(updated_experiment)))
let (_, config_version_id) = fetch_cac_config(tenant, data).await?;

match execute_webhook_call(
tenant_config.experiments_webhook_config,
updated_experiment.clone(),
&config_version_id,
WebhookEvent::ExperimentInprogress,
)
.await
{
Ok(_) => Ok(Json(ExperimentResponse::from(updated_experiment))),
Err(webhook_error) => Err(webhook_error),
}
}

#[put("/{id}/overrides")]
Expand All @@ -659,6 +683,7 @@ async fn update_overrides(
db_conn: DbConnection,
req: web::Json<OverrideKeysUpdateRequest>,
tenant: Tenant,
tenant_config: TenantConfig,
user: User,
) -> superposition::Result<HttpResponse> {
let DbConnection(mut conn) = db_conn;
Expand Down Expand Up @@ -872,9 +897,21 @@ async fn update_overrides(
))
.get_result::<Experiment>(&mut conn)?;

let mut http_resp = HttpResponse::Ok();
add_config_version_to_header(&config_version_id, &mut http_resp);
Ok(http_resp.json(ExperimentResponse::from(updated_experiment)))
match execute_webhook_call(
tenant_config.experiments_webhook_config,
updated_experiment.clone(),
&config_version_id,
WebhookEvent::ExperimentInprogress,
)
.await
{
Ok(_) => {
let mut http_resp = HttpResponse::Ok();
add_config_version_to_header(&config_version_id, &mut http_resp);
Ok(http_resp.json(ExperimentResponse::from(updated_experiment)))
}
Err(webhook_error) => Err(webhook_error),
}
}

#[get("/audit")]
Expand Down
72 changes: 70 additions & 2 deletions crates/experimentation_platform/src/api/experiments/helpers.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,15 @@
use actix_http::header::{self, HeaderMap, HeaderName, HeaderValue};
use actix_web::web::Data;
use anyhow::anyhow;
use diesel::pg::PgConnection;
use diesel::{BoolExpressionMethods, ExpressionMethods, QueryDsl, RunQueryDsl};
use serde_json::{Map, Value};
use service_utils::helpers::extract_dimensions;
use service_utils::service::types::ExperimentationFlags;
use service_utils::service::types::{AppState, ExperimentationFlags, Tenant};
use std::collections::HashSet;
use std::str::FromStr;
use superposition_macros::{bad_argument, unexpected_error};
use superposition_types::{result as superposition, Condition, Exp, Overrides};
use superposition_types::{result as superposition, Condition, Config, Exp, Overrides};

use crate::db::models::{Experiment, ExperimentStatusType, Variant, VariantType};

Expand Down Expand Up @@ -268,3 +272,67 @@ pub fn decide_variant(

Ok(applicable_variants.get(index).cloned())
}

pub fn construct_header_map(
tenant: &str,
other_headers: Vec<(&str, String)>,
) -> superposition::Result<HeaderMap> {
let mut headers = HeaderMap::new();
let tenant_val = HeaderValue::from_str(tenant).map_err(|err| {
log::error!("failed to set header: {}", err);
unexpected_error!("Something went wrong")
})?;
headers.insert(HeaderName::from_static("x-tenant"), tenant_val);
for (header, value) in other_headers {
let header_name = HeaderName::from_str(header).map_err(|err| {
log::error!("failed to set header: {}", err);
unexpected_error!("Something went wrong")
})?;

HeaderValue::from_str(value.as_str())
.map(|header_val| headers.insert(header_name, header_val))
.map_err(|err| {
log::error!("failed to set header: {}", err);
unexpected_error!("Something went wrong")
})?;
}

Ok(headers)
}

pub async fn fetch_cac_config(
tenant: Tenant,
state: Data<AppState>,
) -> superposition::Result<(Config, Option<String>)> {
let http_client = reqwest::Client::new();
let url = state.cac_host.clone() + "/config";
let headers_map = construct_header_map(tenant.as_str(), Vec::new())?;

let response = http_client
.get(&url)
.headers(headers_map.into())
.header(
header::AUTHORIZATION,
format!("Internal {}", state.superposition_token),
)
.send()
.await;

match response {
Ok(res) => {
let config_version = res
.headers()
.get("x-config-version")
.and_then(|val| val.to_str().map_or(None, |v| Some(v.to_string())));
let config = res.json::<Config>().await.map_err(|err| {
log::error!("failed to parse cac config response with error: {}", err);
unexpected_error!("Failed to parse cac config.")
})?;
Ok((config, config_version))
}
Err(error) => {
log::error!("Failed to fetch cac config with error: {:?}", error);
Err(superposition::AppError::UnexpectedError(anyhow!(error)))
}
}
}
Loading

0 comments on commit de69732

Please sign in to comment.