Skip to content

Commit

Permalink
turn ProtocolInspectPolicy into AclDstHostRuleSet
Browse files Browse the repository at this point in the history
  • Loading branch information
GlenDC committed Oct 10, 2024
1 parent 90c721e commit c20fa4a
Show file tree
Hide file tree
Showing 40 changed files with 640 additions and 499 deletions.
11 changes: 8 additions & 3 deletions g3proxy/doc/configuration/values/dpi.rst
Original file line number Diff line number Diff line change
Expand Up @@ -67,11 +67,12 @@ The keys ars:
protocol inspect policy
-----------------------

**type**: string
**type**: string | map

Set what we should do to a specific application protocol.

The possible value are:
One can use the *string* type to define an action for any upstream traffic, regardless of the host,
the possible values for this are:

- intercept

Expand All @@ -89,7 +90,11 @@ The possible value are:

Block the traffic. And we will try to send application level error code to the client.

.. versionadded:: 1.9.0
For more complex setups one can also use the *map* type which
is documented in :ref:`acl rule set <conf_value_acl_rule_set>` with the only
difference that the action variants are the strings defined here.

.. versionadded:: 1.11.0

.. _conf_value_dpi_protocol_inspection:

Expand Down
16 changes: 8 additions & 8 deletions g3proxy/src/audit/handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,8 +113,8 @@ impl AuditHandle {
}

#[inline]
pub(crate) fn h2_inspect_policy(&self) -> ProtocolInspectPolicy {
self.auditor_config.h2_inspect_policy
pub(crate) fn h2_inspect_policy(&self) -> &ProtocolInspectPolicy {
&self.auditor_config.h2_inspect_policy
}

#[inline]
Expand All @@ -123,13 +123,13 @@ impl AuditHandle {
}

#[inline]
pub(crate) fn websocket_inspect_policy(&self) -> ProtocolInspectPolicy {
self.auditor_config.websocket_inspect_policy
pub(crate) fn websocket_inspect_policy(&self) -> &ProtocolInspectPolicy {
&self.auditor_config.websocket_inspect_policy
}

#[inline]
pub(crate) fn smtp_inspect_policy(&self) -> ProtocolInspectPolicy {
self.auditor_config.smtp_inspect_policy
pub(crate) fn smtp_inspect_policy(&self) -> &ProtocolInspectPolicy {
&self.auditor_config.smtp_inspect_policy
}

#[inline]
Expand All @@ -138,8 +138,8 @@ impl AuditHandle {
}

#[inline]
pub(crate) fn imap_inspect_policy(&self) -> ProtocolInspectPolicy {
self.auditor_config.imap_inspect_policy
pub(crate) fn imap_inspect_policy(&self) -> &ProtocolInspectPolicy {
&self.auditor_config.imap_inspect_policy
}

#[inline]
Expand Down
8 changes: 4 additions & 4 deletions g3proxy/src/auth/user.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use governor::{clock::DefaultClock, state::InMemoryState, state::NotKeyed, RateL
use tokio::time::Instant;

use g3_io_ext::{GlobalDatagramLimiter, GlobalLimitGroup, GlobalStreamLimiter};
use g3_types::acl::{AclAction, AclNetworkRule};
use g3_types::acl::{AclAction, AclNetworkRule, ActionContract};
use g3_types::acl_set::AclDstHostRuleSet;
use g3_types::auth::UserAuthError;
use g3_types::limit::{GaugeSemaphore, GaugeSemaphorePermit};
Expand Down Expand Up @@ -605,7 +605,7 @@ impl User {
forbid_stats.add_dest_denied();
return action;
};
default_action = default_action.restrict(action);
default_action = default_action.restrict(&action);
}

if let Some(filter) = &self.dst_host_filter {
Expand All @@ -614,7 +614,7 @@ impl User {
forbid_stats.add_dest_denied();
return action;
}
default_action = default_action.restrict(action);
default_action = default_action.restrict(&action);
}

if default_action.forbid_early() {
Expand All @@ -636,7 +636,7 @@ impl User {
forbid_stats.add_ua_blocked();
return Some(action);
}
default_action = default_action.restrict(action);
default_action = default_action.restrict(&action);
}
}
Some(default_action)
Expand Down
20 changes: 16 additions & 4 deletions g3proxy/src/config/audit/auditor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,12 +85,24 @@ impl AuditorConfig {
tls_stream_dump: None,
log_uri_max_chars: 1024,
h1_interception: Default::default(),
h2_inspect_policy: ProtocolInspectPolicy::Intercept,
h2_inspect_policy: ProtocolInspectPolicy::builder_with_missing_action(
g3_dpi::ProtocolInspectAction::Intercept,
)
.build(),
h2_interception: Default::default(),
websocket_inspect_policy: ProtocolInspectPolicy::Intercept,
smtp_inspect_policy: ProtocolInspectPolicy::Intercept,
websocket_inspect_policy: ProtocolInspectPolicy::builder_with_missing_action(
g3_dpi::ProtocolInspectAction::Intercept,
)
.build(),
smtp_inspect_policy: ProtocolInspectPolicy::builder_with_missing_action(
g3_dpi::ProtocolInspectAction::Intercept,
)
.build(),
smtp_interception: Default::default(),
imap_inspect_policy: ProtocolInspectPolicy::Intercept,
imap_inspect_policy: ProtocolInspectPolicy::builder_with_missing_action(
g3_dpi::ProtocolInspectAction::Intercept,
)
.build(),
imap_interception: Default::default(),
icap_reqmod_service: None,
icap_respmod_service: None,
Expand Down
13 changes: 11 additions & 2 deletions g3proxy/src/inspect/http/v1/upgrade/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use slog::slog_info;
use tokio::io::{AsyncRead, AsyncWrite, AsyncWriteExt};
use tokio::time::Instant;

use g3_dpi::Protocol;
use g3_dpi::{Protocol, ProtocolInspectAction};
use g3_http::client::HttpTransparentResponse;
use g3_http::server::{HttpTransparentRequest, UriExt};
use g3_http::{HttpBodyReader, HttpBodyType};
Expand Down Expand Up @@ -153,9 +153,18 @@ where
where
CW: AsyncWrite + Unpin,
{
let policy_action = match self.req.host.as_ref() {
Some(upstream) => {
let (_, policy_action) = self.ctx.websocket_inspect_policy().check(upstream.host());
policy_action
}
None => self.ctx.websocket_inspect_policy().missing_action(),
};
let block_websocket = policy_action == ProtocolInspectAction::Block;

let upgrade_token_count = self.req.retain_upgrade(|p| {
if matches!(p, HttpUpgradeToken::Websocket) {
return !self.ctx.websocket_inspect_policy().is_block();
return !block_websocket;
}
if matches!(p, HttpUpgradeToken::ConnectIp) {
return false;
Expand Down
11 changes: 9 additions & 2 deletions g3proxy/src/inspect/http/v2/connect/extended.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use h2::{RecvStream, StreamId};
use http::{header, Request, Response, StatusCode, Version};
use slog::slog_info;

use g3_dpi::Protocol;
use g3_dpi::{Protocol, ProtocolInspectAction};
use g3_h2::{H2StreamReader, H2StreamWriter};
use g3_http::server::UriExt;
use g3_slog_types::{LtDateTime, LtDuration, LtH2StreamId, LtUpstreamAddr, LtUuid};
Expand Down Expand Up @@ -178,7 +178,14 @@ where
}
};

if self.ctx.websocket_inspect_policy().is_block() {
let policy_action = match self.upstream.as_ref() {
Some(upstream) => {
let (_, policy_action) = self.ctx.websocket_inspect_policy().check(upstream.host());
policy_action
}
None => self.ctx.websocket_inspect_policy().missing_action(),
};
if policy_action == ProtocolInspectAction::Block {
self.reply_forbidden(clt_send_rsp);
intercept_log!(self, "websocket blocked by inspection policy");
return;
Expand Down
13 changes: 7 additions & 6 deletions g3proxy/src/inspect/http/v2/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use slog::slog_info;
use tokio::io::{AsyncRead, AsyncWrite, AsyncWriteExt};
use tokio::time::Instant;

use g3_dpi::{Protocol, ProtocolInspectPolicy};
use g3_dpi::{Protocol, ProtocolInspectAction};
use g3_h2::H2BodyTransfer;
use g3_io_ext::OnceBufReader;
use g3_slog_types::{LtUpstreamAddr, LtUuid};
Expand Down Expand Up @@ -110,15 +110,16 @@ where
SC: ServerConfig + Send + Sync + 'static,
{
pub(crate) async fn intercept(mut self) -> ServerTaskResult<()> {
let r = match self.ctx.h2_inspect_policy() {
ProtocolInspectPolicy::Intercept => self
let (_, inspect_action) = self.ctx.h2_inspect_policy().check(self.upstream.host());
let r = match inspect_action {
ProtocolInspectAction::Intercept => self
.do_intercept()
.await
.map_err(|e| InterceptionError::H2(e).into_server_task_error(Protocol::Http2)),
#[cfg(feature = "quic")]
ProtocolInspectPolicy::Detour => self.do_detour().await,
ProtocolInspectPolicy::Bypass => self.do_bypass().await,
ProtocolInspectPolicy::Block => self
ProtocolInspectAction::Detour => self.do_detour().await,
ProtocolInspectAction::Bypass => self.do_bypass().await,
ProtocolInspectAction::Block => self
.do_block()
.await
.map_err(|e| InterceptionError::H2(e).into_server_task_error(Protocol::Http2)),
Expand Down
13 changes: 7 additions & 6 deletions g3proxy/src/inspect/imap/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use anyhow::anyhow;
use slog::slog_info;
use tokio::io::AsyncWriteExt;

use g3_dpi::ProtocolInspectPolicy;
use g3_dpi::ProtocolInspectAction;
use g3_imap_proto::response::ByeResponse;
use g3_imap_proto::CommandPipeline;
use g3_io_ext::{LineRecvVec, OnceBufReader};
Expand Down Expand Up @@ -130,12 +130,13 @@ where
}

pub(crate) async fn intercept(mut self) -> ServerTaskResult<Option<StreamInspection<SC>>> {
let r = match self.ctx.imap_inspect_policy() {
ProtocolInspectPolicy::Intercept => self.do_intercept().await,
let (_, inspect_action) = self.ctx.imap_inspect_policy().check(self.upstream.host());
let r = match inspect_action {
ProtocolInspectAction::Intercept => self.do_intercept().await,
#[cfg(feature = "quic")]
ProtocolInspectPolicy::Detour => self.do_detour().await.map(|_| None),
ProtocolInspectPolicy::Bypass => self.do_bypass().await.map(|_| None),
ProtocolInspectPolicy::Block => self.do_block().await.map(|_| None),
ProtocolInspectAction::Detour => self.do_detour().await.map(|_| None),
ProtocolInspectAction::Bypass => self.do_bypass().await.map(|_| None),
ProtocolInspectAction::Block => self.do_block().await.map(|_| None),
};
match r {
Ok(obj) => {
Expand Down
8 changes: 4 additions & 4 deletions g3proxy/src/inspect/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -263,7 +263,7 @@ impl<SC: ServerConfig> StreamInspectContext<SC> {
}

#[inline]
fn h2_inspect_policy(&self) -> ProtocolInspectPolicy {
fn h2_inspect_policy(&self) -> &ProtocolInspectPolicy {
self.audit_handle.h2_inspect_policy()
}

Expand All @@ -281,12 +281,12 @@ impl<SC: ServerConfig> StreamInspectContext<SC> {
}

#[inline]
fn websocket_inspect_policy(&self) -> ProtocolInspectPolicy {
fn websocket_inspect_policy(&self) -> &ProtocolInspectPolicy {
self.audit_handle.websocket_inspect_policy()
}

#[inline]
fn smtp_inspect_policy(&self) -> ProtocolInspectPolicy {
fn smtp_inspect_policy(&self) -> &ProtocolInspectPolicy {
self.audit_handle.smtp_inspect_policy()
}

Expand All @@ -296,7 +296,7 @@ impl<SC: ServerConfig> StreamInspectContext<SC> {
}

#[inline]
fn imap_inspect_policy(&self) -> ProtocolInspectPolicy {
fn imap_inspect_policy(&self) -> &ProtocolInspectPolicy {
self.audit_handle.imap_inspect_policy()
}

Expand Down
13 changes: 7 additions & 6 deletions g3proxy/src/inspect/smtp/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use anyhow::anyhow;
use slog::slog_info;
use tokio::io::AsyncWriteExt;

use g3_dpi::ProtocolInspectPolicy;
use g3_dpi::ProtocolInspectAction;
use g3_io_ext::{LineRecvBuf, OnceBufReader};
use g3_slog_types::{LtHost, LtUpstreamAddr, LtUuid};
use g3_smtp_proto::command::Command;
Expand Down Expand Up @@ -121,12 +121,13 @@ where
}

pub(crate) async fn intercept(mut self) -> ServerTaskResult<Option<StreamInspection<SC>>> {
let r = match self.ctx.smtp_inspect_policy() {
ProtocolInspectPolicy::Intercept => self.do_intercept().await,
let (_, inspect_action) = self.ctx.smtp_inspect_policy().check(self.upstream.host());
let r = match inspect_action {
ProtocolInspectAction::Intercept => self.do_intercept().await,
#[cfg(feature = "quic")]
ProtocolInspectPolicy::Detour => self.do_detour().await.map(|_| None),
ProtocolInspectPolicy::Bypass => self.do_bypass().await.map(|_| None),
ProtocolInspectPolicy::Block => self.do_block().await.map(|_| None),
ProtocolInspectAction::Detour => self.do_detour().await.map(|_| None),
ProtocolInspectAction::Bypass => self.do_bypass().await.map(|_| None),
ProtocolInspectAction::Block => self.do_block().await.map(|_| None),
};
match r {
Ok(obj) => {
Expand Down
11 changes: 7 additions & 4 deletions g3proxy/src/inspect/tls/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use tokio::io::{AsyncRead, AsyncWrite};
use tokio::runtime::Handle;

use g3_cert_agent::CertAgentHandle;
use g3_dpi::Protocol;
use g3_dpi::{Protocol, ProtocolInspectAction};
use g3_io_ext::{AsyncStream, FlexBufReader, OnceBufReader};
use g3_slog_types::{LtUpstreamAddr, LtUuid};
use g3_types::net::{
Expand Down Expand Up @@ -167,11 +167,14 @@ impl<SC: ServerConfig> TlsInterceptObject<SC> {

fn retain_alpn_protocol(&self, p: &[u8]) -> bool {
if p == AlpnProtocol::Http2.identification_sequence() {
return !self.ctx.h2_inspect_policy().is_block();
let (_, inspect_policy) = self.ctx.h2_inspect_policy().check(self.upstream.host());
return inspect_policy != ProtocolInspectAction::Block;
} else if p == AlpnProtocol::Smtp.identification_sequence() {
return !self.ctx.smtp_inspect_policy().is_block();
let (_, inspect_policy) = self.ctx.smtp_inspect_policy().check(self.upstream.host());
return inspect_policy != ProtocolInspectAction::Block;
} else if p == AlpnProtocol::Imap.identification_sequence() {
return !self.ctx.imap_inspect_policy().is_block();
let (_, inspect_policy) = self.ctx.imap_inspect_policy().check(self.upstream.host());
return inspect_policy != ProtocolInspectAction::Block;
}
true
}
Expand Down
16 changes: 10 additions & 6 deletions g3proxy/src/inspect/websocket/h1.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use anyhow::anyhow;
use slog::slog_info;
use tokio::io::AsyncWriteExt;

use g3_dpi::ProtocolInspectPolicy;
use g3_dpi::ProtocolInspectAction;
use g3_io_ext::LimitedWriteExt;
use g3_slog_types::{LtHttpHeaderValue, LtUpstreamAddr, LtUuid};
use g3_types::net::{UpstreamAddr, WebSocketNotes};
Expand Down Expand Up @@ -90,12 +90,16 @@ impl<SC: ServerConfig> H1WebsocketInterceptObject<SC> {
}

pub(crate) async fn intercept(mut self) -> ServerTaskResult<()> {
let r = match self.ctx.websocket_inspect_policy() {
ProtocolInspectPolicy::Intercept => self.do_intercept().await,
let (_, inspect_action) = self
.ctx
.websocket_inspect_policy()
.check(self.upstream.host());
let r = match inspect_action {
ProtocolInspectAction::Intercept => self.do_intercept().await,
#[cfg(feature = "quic")]
ProtocolInspectPolicy::Detour => self.do_detour().await,
ProtocolInspectPolicy::Bypass => self.do_bypass().await,
ProtocolInspectPolicy::Block => self.do_block().await,
ProtocolInspectAction::Detour => self.do_detour().await,
ProtocolInspectAction::Bypass => self.do_bypass().await,
ProtocolInspectAction::Block => self.do_block().await,
};
match r {
Ok(_) => {
Expand Down
16 changes: 10 additions & 6 deletions g3proxy/src/inspect/websocket/h2.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use bytes::Bytes;
use h2::{RecvStream, SendStream};
use slog::slog_info;

use g3_dpi::ProtocolInspectPolicy;
use g3_dpi::ProtocolInspectAction;
use g3_h2::{H2StreamReader, H2StreamWriter};
use g3_slog_types::{LtHttpHeaderValue, LtUpstreamAddr, LtUuid};
use g3_types::net::{UpstreamAddr, WebSocketNotes};
Expand Down Expand Up @@ -74,12 +74,16 @@ impl<SC: ServerConfig> H2WebsocketInterceptObject<SC> {
ups_r: RecvStream,
ups_w: SendStream<Bytes>,
) {
let r = match self.ctx.websocket_inspect_policy() {
ProtocolInspectPolicy::Intercept => self.do_intercept(clt_r, clt_w, ups_r, ups_w).await,
let (_, inspect_action) = self
.ctx
.websocket_inspect_policy()
.check(self.upstream.host());
let r = match inspect_action {
ProtocolInspectAction::Intercept => self.do_intercept(clt_r, clt_w, ups_r, ups_w).await,
#[cfg(feature = "quic")]
ProtocolInspectPolicy::Detour => self.do_detour(clt_r, clt_w, ups_r, ups_w).await,
ProtocolInspectPolicy::Bypass => self.do_bypass(clt_r, clt_w, ups_r, ups_w).await,
ProtocolInspectPolicy::Block => self.do_block(clt_w, ups_w).await,
ProtocolInspectAction::Detour => self.do_detour(clt_r, clt_w, ups_r, ups_w).await,
ProtocolInspectAction::Bypass => self.do_bypass(clt_r, clt_w, ups_r, ups_w).await,
ProtocolInspectAction::Block => self.do_block(clt_w, ups_w).await,
};
match r {
Ok(_) => {
Expand Down
Loading

0 comments on commit c20fa4a

Please sign in to comment.