Skip to content

Commit

Permalink
add maintenance code for tx tables
Browse files Browse the repository at this point in the history
  • Loading branch information
gunplar committed Aug 4, 2023
1 parent 5cc64ee commit 05bdd70
Show file tree
Hide file tree
Showing 2 changed files with 81 additions and 33 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import static com.here.naksha.lib.core.exceptions.UncheckedException.unchecked;
import static com.here.naksha.lib.psql.SQL.escapeId;

import com.here.naksha.lib.core.NakshaAdminCollection;
import com.here.naksha.lib.core.NakshaVersion;
import com.here.naksha.lib.core.lambdas.Pe1;
import com.here.naksha.lib.core.models.TxSignalSet;
Expand Down Expand Up @@ -86,9 +85,8 @@ public PsqlStorage(@NotNull PsqlConfig config, long storageNumber) {
* @param connector The connector associated with this storage
*/
public PsqlStorage(@NotNull Connector connector) {
final PsqlConfig dbConfig =
JsonSerializable.fromAnyMap(connector.getProperties(), ConnectorProperties.class)
.getDbConfig();
final PsqlConfig dbConfig = JsonSerializable.fromAnyMap(connector.getProperties(), ConnectorProperties.class)
.getDbConfig();
if (dbConfig == null) {
throw new IllegalArgumentException("dbConfig missing in connector properties");
}
Expand Down Expand Up @@ -142,10 +140,8 @@ public final long getStorageNumber() {
}

static final String C3P0EXT_CONFIG_SCHEMA = "config.schema()"; // TODO: Why to we need this?
static final String[] extensionList =
new String[] {"postgis", "postgis_topology", "tsm_system_rows", "dblink"};
static final String[] localScripts =
new String[] {"/xyz_ext.sql", "/h3Core.sql", "/naksha_ext.sql"};
static final String[] extensionList = new String[] {"postgis", "postgis_topology", "tsm_system_rows", "dblink"};
static final String[] localScripts = new String[] {"/xyz_ext.sql", "/h3Core.sql", "/naksha_ext.sql"};

// We can store meta-information at tables using
// COMMENT ON TABLE "xyz_config"."transactions" IS '{"id":"transactions"}';
Expand Down Expand Up @@ -188,18 +184,20 @@ public void init() {
}
if (latest.toLong() != version) {
if (version == 0L) {
currentLogger().atInfo("Install and initialize Naksha extension v{}").add(latest).log();
currentLogger()
.atInfo("Install and initialize Naksha extension v{}")
.add(latest)
.log();
} else {
currentLogger()
.atInfo("Upgrade Naksha extension from v{} to v{}")
.add(new NakshaVersion(version))
.add(latest)
.log();
}
SQL =
IoHelp.readResource("naksha_ext.sql")
.replaceAll("\\$\\{schema}", getSchema())
.replaceAll("\\$\\{storage_id}", Long.toString(getStorageNumber()));
SQL = IoHelp.readResource("naksha_ext.sql")
.replaceAll("\\$\\{schema}", getSchema())
.replaceAll("\\$\\{storage_id}", Long.toString(getStorageNumber()));
stmt.execute(SQL);
conn.commit();

Expand All @@ -222,7 +220,7 @@ public void init() {
}
}

public static int maxHistoryAgeInDays = 30; //TODO this or Space.maxHistoryAge
public static int maxHistoryAgeInDays = 30; // TODO this or Space.maxHistoryAge

/**
* Review all collections and ensure that the history does have the needed partitions created. The
Expand All @@ -236,26 +234,37 @@ public void maintain(@NotNull List<CollectionInfo> collectionInfoList) {
for (CollectionInfo collectionInfo : collectionInfoList) {
try (final Connection conn = dataSource.getConnection()) {
try (final Statement stmt = conn.createStatement()) {
stmt.execute(createPartitionOfOneDay(0, collectionInfo));
stmt.execute(createPartitionOfOneDay(1, collectionInfo));
stmt.execute(createPartitionOfOneDay(2, collectionInfo));
// stmt.execute(deletePartitionOfOneDay(maxHistoryAgeInDays, collectionInfo));
// stmt.execute(deletePartitionOfOneDay(maxHistoryAgeInDays+1, collectionInfo));
// stmt.execute(deletePartitionOfOneDay(maxHistoryAgeInDays+2, collectionInfo));
// stmt.execute(deletePartitionOfOneDay(maxHistoryAgeInDays+3, collectionInfo));
// stmt.execute(deletePartitionOfOneDay(maxHistoryAgeInDays+4, collectionInfo));
// stmt.execute(deletePartitionOfOneDay(maxHistoryAgeInDays+5, collectionInfo));
stmt.execute(createHstPartitionOfOneDay(0, collectionInfo));
stmt.execute(createHstPartitionOfOneDay(1, collectionInfo));
stmt.execute(createHstPartitionOfOneDay(2, collectionInfo));
stmt.execute(createTxPartitionOfOneDay(0, collectionInfo));
stmt.execute(createTxPartitionOfOneDay(1, collectionInfo));
stmt.execute(createTxPartitionOfOneDay(2, collectionInfo));
// stmt.execute(deleteHstPartitionOfOneDay(maxHistoryAgeInDays, collectionInfo));
// stmt.execute(deleteHstPartitionOfOneDay(maxHistoryAgeInDays+1, collectionInfo));
// stmt.execute(deleteHstPartitionOfOneDay(maxHistoryAgeInDays+2, collectionInfo));
// stmt.execute(deleteHstPartitionOfOneDay(maxHistoryAgeInDays+3, collectionInfo));
// stmt.execute(deleteHstPartitionOfOneDay(maxHistoryAgeInDays+4, collectionInfo));
// stmt.execute(deleteHstPartitionOfOneDay(maxHistoryAgeInDays+5, collectionInfo));
//TODO only delete transaction partitions for admin DB
// stmt.execute(deleteTxPartitionOfOneDay(maxHistoryAgeInDays, collectionInfo));
// stmt.execute(deleteTxPartitionOfOneDay(maxHistoryAgeInDays+1, collectionInfo));
// stmt.execute(deleteTxPartitionOfOneDay(maxHistoryAgeInDays+2, collectionInfo));
// stmt.execute(deleteTxPartitionOfOneDay(maxHistoryAgeInDays+3, collectionInfo));
// stmt.execute(deleteTxPartitionOfOneDay(maxHistoryAgeInDays+4, collectionInfo));
// stmt.execute(deleteTxPartitionOfOneDay(maxHistoryAgeInDays+5, collectionInfo));

}
//commit once for every single collection so that partial progress is saved in case something fails midway
// commit once for every single collection so that partial progress is saved in case something fails
// midway
conn.commit();
} catch (Throwable t) {
throw unchecked(t);
}
}
}

private String createPartitionOfOneDay(int dayPlus, CollectionInfo collectionInfo) {
private String createHstPartitionOfOneDay(int dayPlus, CollectionInfo collectionInfo) {
return new StringBuilder()
.append("SELECT ")
.append(getSchema())
Expand All @@ -267,7 +276,19 @@ private String createPartitionOfOneDay(int dayPlus, CollectionInfo collectionInf
.toString();
}

private String deletePartitionOfOneDay(int dayOld, CollectionInfo collectionInfo) {
private String createTxPartitionOfOneDay(int dayPlus, CollectionInfo collectionInfo) {
return new StringBuilder()
.append("SELECT ")
.append(getSchema())
.append(".__naksha_create_tx_partition_for_day('")
.append(collectionInfo.getId())
.append("',current_timestamp+'")
.append(dayPlus)
.append(" day'::interval);")
.toString();
}

private String deleteHstPartitionOfOneDay(int dayOld, CollectionInfo collectionInfo) {
return new StringBuilder()
.append("SELECT ")
.append(getSchema())
Expand All @@ -279,14 +300,25 @@ private String deletePartitionOfOneDay(int dayOld, CollectionInfo collectionInfo
.toString();
}

private String deleteTxPartitionOfOneDay(int dayOld, CollectionInfo collectionInfo) {
return new StringBuilder()
.append("SELECT ")
.append(getSchema())
.append(".__naksha_delete_tx_partition_for_day('")
.append(collectionInfo.getId())
.append("',current_timestamp-'")
.append(dayOld)
.append(" day'::interval);")
.toString();
}

/**
* Create default transaction settings.
*
* @return New transaction settings.
*/
public @NotNull ITransactionSettings createSettings() {
return new PsqlTransactionSettings(
dataSource.pool.config.stmtTimeout, dataSource.pool.config.lockTimeout);
return new PsqlTransactionSettings(dataSource.pool.config.stmtTimeout, dataSource.pool.config.lockTimeout);
}

@Override
Expand Down Expand Up @@ -328,11 +360,9 @@ private void setupH3() throws SQLException {
final Statement stmt = connection.createStatement()) {
boolean needUpdate = false;
ResultSet rs;
if ((rs =
stmt.executeQuery(
"select count(1)::integer from pg_catalog.pg_proc r inner join"
+ " pg_catalog.pg_namespace l on ( r.pronamespace = l.oid ) where "
+ "l.nspname = 'h3' and r.proname = 'h3_version'"))
if ((rs = stmt.executeQuery("select count(1)::integer from pg_catalog.pg_proc r inner join"
+ " pg_catalog.pg_namespace l on ( r.pronamespace = l.oid ) where "
+ "l.nspname = 'h3' and r.proname = 'h3_version'"))
.next()) {
needUpdate = (0 == rs.getInt(1));
}
Expand Down
18 changes: 18 additions & 0 deletions here-naksha-lib-psql/src/main/resources/naksha_ext.sql
Original file line number Diff line number Diff line change
Expand Up @@ -1838,4 +1838,22 @@ BEGIN
sql := format('DROP TABLE IF EXISTS %I;', hst_part_name);
EXECUTE sql;
END
$BODY$;

-- Drop the partition table for the given date.
CREATE OR REPLACE FUNCTION __naksha_delete_tx_partition_for_day(collection text, from_ts timestamptz)
RETURNS void
LANGUAGE 'plpgsql' VOLATILE
AS $BODY$
DECLARE
sql text;
from_day text;
tx_part_name text;
BEGIN
from_day := to_char(from_ts, 'YYYY_MM_DD');
tx_part_name := format('%s_tx_%s', collection, from_day); -- example: foo_tx_2023_03_01

sql := format('DROP TABLE IF EXISTS %I;', tx_part_name);
EXECUTE sql;
END
$BODY$;

0 comments on commit 05bdd70

Please sign in to comment.