Skip to content

Commit

Permalink
Add partitioned column to pgmq_meta (#97)
Browse files Browse the repository at this point in the history
* Add 'partitioned' attribute to pgmq_meta

* Use strum for Display

* Use boolean column

* add migration

* update migration

* Update pgmq--0.21.0--0.22.0.sql
  • Loading branch information
craigpangea authored Sep 3, 2023
1 parent f8ae3e8 commit 25cd795
Show file tree
Hide file tree
Showing 8 changed files with 41 additions and 12 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

12 changes: 8 additions & 4 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "pgmq"
version = "0.21.0"
version = "0.22.0"
edition = "2021"
authors = ["Tembo.io"]
description = "Postgres extension for PGMQ"
Expand All @@ -25,15 +25,19 @@ pg_test = []
[dependencies]
pgrx = "0.9.8"
serde = "1.0.152"
pgmq_core = {package = "pgmq-core", path = "./core"}
pgmq_core = { package = "pgmq-core", path = "./core" }
serde_json = "1.0.91"
thiserror = "1.0.38"

[dev-dependencies]
pgrx-tests = "0.9.8"
chrono = { version = "0.4.23", features = [ "serde" ] }
chrono = { version = "0.4.23", features = ["serde"] }
rand = "0.8.5"
sqlx = { version = "0.6", features = [ "runtime-tokio-native-tls" , "postgres", "chrono" ] }
sqlx = { version = "0.6", features = [
"runtime-tokio-native-tls",
"postgres",
"chrono",
] }
tokio = { version = "1", features = ["macros"] }
whoami = "1.4.0"

Expand Down
10 changes: 5 additions & 5 deletions core/src/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ pub fn init_queue(name: &str) -> Result<Vec<String>, PgmqError> {
create_archive(name)?,
assign_archive(name)?,
create_archive_index(name)?,
insert_meta(name)?,
insert_meta(name, false)?,
grant_pgmon_queue(name)?,
])
}
Expand Down Expand Up @@ -150,14 +150,14 @@ pub fn drop_queue_archive(name: CheckedName<'_>) -> Result<String, PgmqError> {
))
}

pub fn insert_meta(name: CheckedName<'_>) -> Result<String, PgmqError> {
pub fn insert_meta(name: CheckedName<'_>, is_partitioned: bool) -> Result<String, PgmqError> {
Ok(format!(
"
INSERT INTO {PGMQ_SCHEMA}.{TABLE_PREFIX}_meta (queue_name)
VALUES ('{name}')
INSERT INTO {PGMQ_SCHEMA}.{TABLE_PREFIX}_meta (queue_name, is_partitioned)
VALUES ('{name}', '{is_partitioned}')
ON CONFLICT
DO NOTHING;
"
",
))
}

Expand Down
1 change: 1 addition & 0 deletions core/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ pub const PGMQ_SCHEMA: &str = "public";

pub struct PGMQueueMeta {
pub queue_name: String,
pub is_partitioned: bool,
pub created_at: DateTime<Utc>,
}

Expand Down
Empty file added sql/pgmq--0.20.0--0.21.0.sql
Empty file.
23 changes: 23 additions & 0 deletions sql/pgmq--0.21.0--0.22.0.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
DO $$
DECLARE
qname TEXT;
BEGIN
ALTER TABLE pgmq_meta ADD COLUMN is_partitioned BOOLEAN;

IF 'pg_partman' NOT IN (SELECT extname FROM pg_extension) THEN
UPDATE pgmq_meta SET is_partitioned = false;
ELSE
FOR qname IN (SELECT queue_name FROM pgmq_meta)
LOOP
-- If qname is in public.part_config it must be a partitioned table.
IF format('public.pgmq_%1$I', qname) IN (SELECT parent_table FROM public.part_config) THEN
UPDATE pgmq_meta SET is_partitioned = true WHERE queue_name = qname;
ELSE
UPDATE pgmq_meta SET is_partitioned = false WHERE queue_name = qname;
END IF;
END LOOP;
END IF;

ALTER TABLE pgmq_meta ALTER COLUMN is_partitioned SET NOT NULL;
END $$;

1 change: 1 addition & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ extension_sql!(
"
CREATE TABLE public.pgmq_meta (
queue_name VARCHAR UNIQUE NOT NULL,
is_partitioned BOOLEAN NOT NULL,
created_at TIMESTAMP WITH TIME ZONE DEFAULT now() NOT NULL
);
Expand Down
4 changes: 2 additions & 2 deletions src/partition.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ pub fn init_partitioned_queue(
create_archive(name)?,
assign_archive(name)?,
create_partitioned_table(name, partition_col, partition_interval)?,
insert_meta(name)?,
insert_meta(name, true)?,
set_retention_config(name, retention_interval)?,
grant_pgmon_queue(name)?,
grant_pgmon_queue_seq(name)?,
Expand All @@ -55,7 +55,7 @@ pub fn init_partitioned_queue_client_only(
create_archive(name)?,
assign_archive(name)?,
create_partitioned_table(name, partition_col, partition_interval)?,
insert_meta(name)?,
insert_meta(name, true)?,
set_retention_config(name, retention_interval)?,
grant_pgmon_queue(name)?,
grant_pgmon_queue_seq(name)?,
Expand Down

0 comments on commit 25cd795

Please sign in to comment.