Skip to content

Commit

Permalink
handle delete of partition queues (#36)
Browse files Browse the repository at this point in the history
* drop_queue without pg_partman

* add comment
  • Loading branch information
ChuckHend authored Aug 9, 2023
1 parent aaaf47e commit 9b48f61
Show file tree
Hide file tree
Showing 4 changed files with 31 additions and 10 deletions.
6 changes: 3 additions & 3 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "pgmq"
version = "0.11.0"
version = "0.11.1"
edition = "2021"
authors = ["Tembo.io"]
description = "Postgres extension for PGMQ"
Expand Down
20 changes: 15 additions & 5 deletions src/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,18 +6,28 @@ use crate::partition::PARTMAN_SCHEMA;
use pgmq_crate::query::{PGMQ_SCHEMA, TABLE_PREFIX};

#[pg_extern]
fn pgmq_drop_queue(queue_name: String) -> Result<bool, spi::Error> {
delete_queue(queue_name)?;
fn pgmq_drop_queue(
queue_name: String,
partitioned: default!(bool, false),
) -> Result<bool, spi::Error> {
delete_queue(queue_name, partitioned)?;
Ok(true)
}

pub fn delete_queue(queue_name: String) -> Result<(), spi::Error> {
pub fn delete_queue(queue_name: String, partitioned: bool) -> Result<(), spi::Error> {
// TODO: we should keep track whether queue is partitioned in pgmq_meta
// then read that to determine we want to delete the part_config entries
// this should go out before 1.0
let queue_table = format!("{PGMQ_SCHEMA}.{TABLE_PREFIX}_{queue_name}");
let queries = vec![
let mut queries = vec![
format!("DELETE from {PGMQ_SCHEMA}.pgmq_meta WHERE queue_name = '{queue_name}';"),
format!("DROP TABLE {queue_table};"),
format!("DELETE FROM {PARTMAN_SCHEMA}.part_config where parent_table = '{queue_table}';"),
];
if partitioned {
queries.push(format!(
"DELETE FROM {PARTMAN_SCHEMA}.part_config where parent_table = '{queue_table}';"
))
}
let _: Result<(), spi::Error> = Spi::connect(|mut client| {
for q in queries {
client.update(q.as_str(), None, None)?;
Expand Down
13 changes: 12 additions & 1 deletion tests/integration_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -147,16 +147,27 @@ async fn test_lifecycle() {
struct QueueMeta {
queue_name: String,
}

// delete partitioned queues
for queue in [test_duration_queue, test_numeric_queue].iter() {
sqlx::query(&format!("select pgmq_drop_queue('{}', true);", &queue))
.execute(&conn)
.await
.expect("failed to drop partitioned queues");
}

let queues = sqlx::query_as::<_, QueueMeta>("select queue_name from pgmq_list_queues();")
.fetch_all(&conn)
.await
.expect("failed to list queues");

// drop the rest of the queues
for queue in queues {
let q = queue.queue_name;
sqlx::query(&format!("select pgmq_drop_queue('{}');", &q))
.execute(&conn)
.await
.expect("failed to list queues");
.expect("failed to drop standard queues");
}

let queues = sqlx::query_as::<_, QueueMeta>("select queue_name from pgmq_list_queues();")
Expand Down

0 comments on commit 9b48f61

Please sign in to comment.