diff --git a/Cargo.lock b/Cargo.lock index 5511a41..6ac2508 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1243,10 +1243,10 @@ dependencies = [ [[package]] name = "pgmq" -version = "0.11.0" +version = "0.11.1" dependencies = [ "chrono", - "pgmq 0.14.2", + "pgmq 0.14.3", "pgrx", "pgrx-tests", "rand", @@ -1260,7 +1260,7 @@ dependencies = [ [[package]] name = "pgmq" -version = "0.14.2" +version = "0.14.3" dependencies = [ "chrono", "log", diff --git a/Cargo.toml b/Cargo.toml index eca45b9..52bb1fe 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "pgmq" -version = "0.11.0" +version = "0.11.1" edition = "2021" authors = ["Tembo.io"] description = "Postgres extension for PGMQ" diff --git a/src/api.rs b/src/api.rs index 9b45e66..ce430d3 100644 --- a/src/api.rs +++ b/src/api.rs @@ -6,18 +6,28 @@ use crate::partition::PARTMAN_SCHEMA; use pgmq_crate::query::{PGMQ_SCHEMA, TABLE_PREFIX}; #[pg_extern] -fn pgmq_drop_queue(queue_name: String) -> Result { - delete_queue(queue_name)?; +fn pgmq_drop_queue( + queue_name: String, + partitioned: default!(bool, false), +) -> Result { + delete_queue(queue_name, partitioned)?; Ok(true) } -pub fn delete_queue(queue_name: String) -> Result<(), spi::Error> { +pub fn delete_queue(queue_name: String, partitioned: bool) -> Result<(), spi::Error> { + // TODO: we should keep track whether queue is partitioned in pgmq_meta + // then read that to determine we want to delete the part_config entries + // this should go out before 1.0 let queue_table = format!("{PGMQ_SCHEMA}.{TABLE_PREFIX}_{queue_name}"); - let queries = vec![ + let mut queries = vec![ format!("DELETE from {PGMQ_SCHEMA}.pgmq_meta WHERE queue_name = '{queue_name}';"), format!("DROP TABLE {queue_table};"), - format!("DELETE FROM {PARTMAN_SCHEMA}.part_config where parent_table = '{queue_table}';"), ]; + if partitioned { + queries.push(format!( + "DELETE FROM {PARTMAN_SCHEMA}.part_config where parent_table = '{queue_table}';" + )) + } let _: Result<(), spi::Error> = Spi::connect(|mut client| { for q in queries { client.update(q.as_str(), None, None)?; diff --git a/tests/integration_tests.rs b/tests/integration_tests.rs index 955e544..be26b3d 100644 --- a/tests/integration_tests.rs +++ b/tests/integration_tests.rs @@ -147,16 +147,27 @@ async fn test_lifecycle() { struct QueueMeta { queue_name: String, } + + // delete partitioned queues + for queue in [test_duration_queue, test_numeric_queue].iter() { + sqlx::query(&format!("select pgmq_drop_queue('{}', true);", &queue)) + .execute(&conn) + .await + .expect("failed to drop partitioned queues"); + } + let queues = sqlx::query_as::<_, QueueMeta>("select queue_name from pgmq_list_queues();") .fetch_all(&conn) .await .expect("failed to list queues"); + + // drop the rest of the queues for queue in queues { let q = queue.queue_name; sqlx::query(&format!("select pgmq_drop_queue('{}');", &q)) .execute(&conn) .await - .expect("failed to list queues"); + .expect("failed to drop standard queues"); } let queues = sqlx::query_as::<_, QueueMeta>("select queue_name from pgmq_list_queues();")