Skip to content

Commit

Permalink
Add unlogged queues in extension (#123)
Browse files Browse the repository at this point in the history
  • Loading branch information
v0idpwn authored Sep 26, 2023
1 parent 17aa5a4 commit d874a49
Show file tree
Hide file tree
Showing 9 changed files with 80 additions and 16 deletions.
4 changes: 2 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "pgmq"
version = "0.27.0"
version = "0.28.0"
edition = "2021"
authors = ["Tembo.io"]
description = "Postgres extension for PGMQ"
Expand Down
2 changes: 1 addition & 1 deletion core/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "pgmq-core"
version = "0.5.0"
version = "0.6.0"
edition = "2021"
authors = ["Tembo.io"]
description = "Core functionality shared between the PGMQ Rust SDK and Postgres Extension"
Expand Down
31 changes: 22 additions & 9 deletions core/src/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,16 +9,16 @@ use crate::{

use sqlx::types::chrono::Utc;

pub fn init_queue(name: &str) -> Result<Vec<String>, PgmqError> {
pub fn init_queue(name: &str, is_unlogged: bool) -> Result<Vec<String>, PgmqError> {
let name = CheckedName::new(name)?;
Ok(vec![
create_queue(name)?,
create_queue(name, is_unlogged)?,
assign_queue(name)?,
create_index(name)?,
create_archive(name)?,
assign_archive(name)?,
create_archive_index(name)?,
insert_meta(name, false)?,
insert_meta(name, false, is_unlogged)?,
grant_pgmon_queue(name)?,
])
}
Expand All @@ -34,10 +34,12 @@ pub fn destroy_queue(name: &str) -> Result<Vec<String>, PgmqError> {
])
}

pub fn create_queue(name: CheckedName<'_>) -> Result<String, PgmqError> {
pub fn create_queue(name: CheckedName<'_>, is_unlogged: bool) -> Result<String, PgmqError> {
let maybe_unlogged = if is_unlogged { "UNLOGGED" } else { "" };

Ok(format!(
"
CREATE TABLE IF NOT EXISTS {PGMQ_SCHEMA}.{QUEUE_PREFIX}_{name} (
CREATE {maybe_unlogged} TABLE IF NOT EXISTS {PGMQ_SCHEMA}.{QUEUE_PREFIX}_{name} (
msg_id BIGSERIAL PRIMARY KEY,
read_ct INT DEFAULT 0 NOT NULL,
enqueued_at TIMESTAMP WITH TIME ZONE DEFAULT now() NOT NULL,
Expand Down Expand Up @@ -155,11 +157,15 @@ pub fn drop_queue_archive(name: CheckedName<'_>) -> Result<String, PgmqError> {
))
}

pub fn insert_meta(name: CheckedName<'_>, is_partitioned: bool) -> Result<String, PgmqError> {
pub fn insert_meta(
name: CheckedName<'_>,
is_partitioned: bool,
is_unlogged: bool,
) -> Result<String, PgmqError> {
Ok(format!(
"
INSERT INTO {PGMQ_SCHEMA}.meta (queue_name, is_partitioned)
VALUES ('{name}', {is_partitioned})
INSERT INTO {PGMQ_SCHEMA}.meta (queue_name, is_partitioned, is_unlogged)
VALUES ('{name}', {is_partitioned}, {is_unlogged})
ON CONFLICT
DO NOTHING;
",
Expand Down Expand Up @@ -391,10 +397,17 @@ $$ LANGUAGE plpgsql;
#[test]
fn test_create() {
let queue_name = CheckedName::new("yolo").unwrap();
let query = create_queue(queue_name);
let query = create_queue(queue_name, false);
assert!(query.unwrap().contains("q_yolo"));
}

#[test]
fn create_unlogged() {
let queue_name = CheckedName::new("yolo").unwrap();
let query = create_queue(queue_name, true);
assert!(query.unwrap().contains("CREATE UNLOGGED TABLE"));
}

#[test]
fn test_enqueue() {
let mut msgs: Vec<serde_json::Value> = Vec::new();
Expand Down
3 changes: 3 additions & 0 deletions sql/pgmq--0.27.0--0.28.0.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
ALTER TABLE pgmq.meta ADD COLUMN is_unlogged BOOLEAN;
UPDATE pgmq.meta SET is_unlogged = false;
ALTER TABLE pgmq.meta ALTER COLUMN is_unlogged SET NOT NULL;
14 changes: 13 additions & 1 deletion src/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,19 @@ fn pgmq_purge_queue(queue_name: String) -> Result<i64, PgmqExtError> {

#[pg_extern(name = "create_non_partitioned")]
fn pgmq_create_non_partitioned(queue_name: &str) -> Result<(), PgmqExtError> {
let setup = init_queue(queue_name)?;
let setup = init_queue(queue_name, false)?;
let ran: Result<_, spi::Error> = Spi::connect(|mut c| {
for q in setup {
let _ = c.update(&q, None, None)?;
}
Ok(())
});
Ok(ran?)
}

#[pg_extern(name = "create_unlogged")]
fn pgmq_create_unlogged(queue_name: &str) -> Result<(), PgmqExtError> {
let setup = init_queue(queue_name, true)?;
let ran: Result<_, spi::Error> = Spi::connect(|mut c| {
for q in setup {
let _ = c.update(&q, None, None)?;
Expand Down
4 changes: 2 additions & 2 deletions src/partition.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ pub fn init_partitioned_queue(
create_archive_index(name)?,
assign_archive(name)?,
create_partitioned_table(name, partition_col, partition_interval)?,
insert_meta(name, true)?,
insert_meta(name, true, false)?,
set_retention_config(name, retention_interval)?,
grant_pgmon_queue(name)?,
grant_pgmon_queue_seq(name)?,
Expand All @@ -57,7 +57,7 @@ pub fn init_partitioned_queue_client_only(
create_archive_index(name)?,
assign_archive(name)?,
create_partitioned_table(name, partition_col, partition_interval)?,
insert_meta(name, true)?,
insert_meta(name, true, false)?,
set_retention_config(name, retention_interval)?,
grant_pgmon_queue(name)?,
grant_pgmon_queue_seq(name)?,
Expand Down
1 change: 1 addition & 0 deletions src/sql_src.sql
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
CREATE TABLE pgmq.meta (
queue_name VARCHAR UNIQUE NOT NULL,
is_partitioned BOOLEAN NOT NULL,
is_unlogged BOOLEAN NOT NULL,
created_at TIMESTAMP WITH TIME ZONE DEFAULT now() NOT NULL
);

Expand Down
35 changes: 35 additions & 0 deletions tests/integration_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,41 @@ async fn init_database() -> Pool<Postgres> {
conn
}

// Integration tests are ignored by default
#[ignore]
#[tokio::test]
async fn test_unlogged() {
let conn = init_database().await;
let mut rng = rand::thread_rng();
let test_num = rng.gen_range(0..100000);

// CREATE with default retention and partition strategy
let test_queue = format!("test_unlogged_{test_num}");
let _ = sqlx::query(&format!(
"SELECT {PGMQ_SCHEMA}.create_unlogged('{test_queue}');"
))
.execute(&conn)
.await
.expect("failed to create queue");

let msg_id = sqlx::query(&format!(
"SELECT * from {PGMQ_SCHEMA}.send('{test_queue}', '{{\"hello\": \"world\"}}');"
))
.fetch_one(&conn)
.await
.expect("failed send")
.get::<i64, usize>(0);
assert_eq!(msg_id, 1);

let query = &format!("SELECT * from {PGMQ_SCHEMA}.read('{test_queue}', 2, 1);");

let message = fetch_one_message::<serde_json::Value>(query, &conn)
.await
.expect("failed reading message")
.expect("expected message");
assert_eq!(message.msg_id, msg_id);
}

// Integration tests are ignored by default
#[ignore]
#[tokio::test]
Expand Down

0 comments on commit d874a49

Please sign in to comment.