Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Replacing collapsing merge tree #7

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions src/Core/SettingsEnums.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -104,8 +104,10 @@ IMPLEMENT_SETTING_ENUM_WITH_RENAME(DefaultTableEngine, ErrorCodes::BAD_ARGUMENTS
{"StripeLog", DefaultTableEngine::StripeLog},
{"MergeTree", DefaultTableEngine::MergeTree},
{"ReplacingMergeTree", DefaultTableEngine::ReplacingMergeTree},
{"ReplacingCollapsingMergeTree", DefaultTableEngine::ReplacingCollapsingMergeTree},
{"ReplicatedMergeTree", DefaultTableEngine::ReplicatedMergeTree},
{"ReplicatedReplacingMergeTree", DefaultTableEngine::ReplicatedReplacingMergeTree},
{"ReplicatedReplacingCollapsingMergeTree", DefaultTableEngine::ReplicatedReplacingCollapsingMergeTree},
{"Memory", DefaultTableEngine::Memory}})

IMPLEMENT_SETTING_MULTI_ENUM(MySQLDataTypesSupport, ErrorCodes::UNKNOWN_MYSQL_DATATYPES_SUPPORT_LEVEL,
Expand Down
2 changes: 2 additions & 0 deletions src/Core/SettingsEnums.h
Original file line number Diff line number Diff line change
Expand Up @@ -113,8 +113,10 @@ enum class DefaultTableEngine
StripeLog,
MergeTree,
ReplacingMergeTree,
ReplacingCollapsingMergeTree,
ReplicatedMergeTree,
ReplicatedReplacingMergeTree,
ReplicatedReplacingCollapsingMergeTree,
Memory,
};

Expand Down
6 changes: 6 additions & 0 deletions src/Interpreters/InterpreterCreateQuery.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -822,12 +822,18 @@ String InterpreterCreateQuery::getTableEngineName(DefaultTableEngine default_tab
case DefaultTableEngine::ReplacingMergeTree:
return "ReplacingMergeTree";

case DefaultTableEngine::ReplacingCollapsingMergeTree:
return "ReplacingCollapsingMergeTree";

case DefaultTableEngine::ReplicatedMergeTree:
return "ReplicatedMergeTree";

case DefaultTableEngine::ReplicatedReplacingMergeTree:
return "ReplicatedReplacingMergeTree";

case DefaultTableEngine::ReplicatedReplacingCollapsingMergeTree:
return "ReplicatedReplacingCollapsingMergeTree";

case DefaultTableEngine::Memory:
return "Memory";

Expand Down
5 changes: 5 additions & 0 deletions src/Processors/QueryPlan/ReadFromMergeTree.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
#include <Processors/Merges/GraphiteRollupSortedTransform.h>
#include <Processors/Merges/MergingSortedTransform.h>
#include <Processors/Merges/ReplacingSortedTransform.h>
#include <Processors/Merges/ReplacingCollapsingSortedTransform.h>
#include <Processors/Merges/SummingSortedTransform.h>
#include <Processors/Merges/VersionedCollapsingTransform.h>
#include <Processors/QueryPlan/PartsSplitter.h>
Expand Down Expand Up @@ -633,6 +634,10 @@ static void addMergingFinal(
return std::make_shared<ReplacingSortedTransform>(header, num_outputs,
sort_description, merging_params.version_column, max_block_size);

case MergeTreeData::MergingParams::ReplacingCollapsing:
return std::make_shared<ReplacingCollapsingSortedTransform>(header, num_outputs,
sort_description, merging_params.sign_column, merging_params.version_column, max_block_size);

case MergeTreeData::MergingParams::VersionedCollapsing:
return std::make_shared<VersionedCollapsingTransform>(header, num_outputs,
sort_description, merging_params.sign_column, max_block_size);
Expand Down
14 changes: 14 additions & 0 deletions src/Storages/MergeTree/MergeTask.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include <Processors/Merges/CollapsingSortedTransform.h>
#include <Processors/Merges/SummingSortedTransform.h>
#include <Processors/Merges/ReplacingSortedTransform.h>
#include <Processors/Merges/ReplacingCollapsingSortedTransform.h>
#include <Processors/Merges/GraphiteRollupSortedTransform.h>
#include <Processors/Merges/AggregatingSortedTransform.h>
#include <Processors/Merges/VersionedCollapsingTransform.h>
Expand Down Expand Up @@ -66,6 +67,12 @@ static void extractMergingAndGatheringColumns(
if (merging_params.mode == MergeTreeData::MergingParams::Replacing)
key_columns.emplace(merging_params.version_column);

/// Force sign & version column for ReplacingCollapsing mode
if (merging_params.mode == MergeTreeData::MergingParams::ReplacingCollapsing){
key_columns.emplace(merging_params.sign_column);
key_columns.emplace(merging_params.version_column);
}

/// Force sign column for VersionedCollapsing mode. Version is already in primary key.
if (merging_params.mode == MergeTreeData::MergingParams::VersionedCollapsing)
key_columns.emplace(merging_params.sign_column);
Expand Down Expand Up @@ -896,6 +903,12 @@ void MergeTask::ExecuteAndFinalizeHorizontalPart::createMergedStream()
merge_block_size, ctx->rows_sources_write_buf.get(), ctx->blocks_are_granules_size);
break;

case MergeTreeData::MergingParams::ReplacingCollapsing:
merged_transform = std::make_shared<ReplacingCollapsingSortedTransform>(
header, pipes.size(), sort_description, ctx->merging_params.sign_column, ctx->merging_params.version_column,
merge_block_size, ctx->rows_sources_write_buf.get(), ctx->blocks_are_granules_size);
break;

case MergeTreeData::MergingParams::Graphite:
merged_transform = std::make_shared<GraphiteRollupSortedTransform>(
header, pipes.size(), sort_description, merge_block_size,
Expand Down Expand Up @@ -958,6 +971,7 @@ MergeAlgorithm MergeTask::ExecuteAndFinalizeHorizontalPart::chooseMergeAlgorithm
ctx->merging_params.mode == MergeTreeData::MergingParams::Ordinary ||
ctx->merging_params.mode == MergeTreeData::MergingParams::Collapsing ||
ctx->merging_params.mode == MergeTreeData::MergingParams::Replacing ||
ctx->merging_params.mode == MergeTreeData::MergingParams::ReplacingCollapsing ||
ctx->merging_params.mode == MergeTreeData::MergingParams::VersionedCollapsing;

bool enough_ordinary_cols = global_ctx->gathering_columns.size() >= data_settings->vertical_merge_algorithm_min_columns_to_activate;
Expand Down
28 changes: 18 additions & 10 deletions src/Storages/MergeTree/MergeTreeData.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -370,6 +370,7 @@ bool MergeTreeData::supportsFinal() const
|| merging_params.mode == MergingParams::Summing
|| merging_params.mode == MergingParams::Aggregating
|| merging_params.mode == MergingParams::Replacing
|| merging_params.mode == MergingParams::ReplacingCollapsing
|| merging_params.mode == MergingParams::Graphite
|| merging_params.mode == MergingParams::VersionedCollapsing;
}
Expand Down Expand Up @@ -694,12 +695,12 @@ void MergeTreeData::MergingParams::check(const StorageInMemoryMetadata & metadat
{
const auto columns = metadata.getColumns().getAllPhysical();

if (!sign_column.empty() && mode != MergingParams::Collapsing && mode != MergingParams::VersionedCollapsing)
if (!sign_column.empty() && mode != MergingParams::Collapsing && mode != MergingParams::VersionedCollapsing && mode != MergingParams::ReplacingCollapsing)
throw Exception("Sign column for MergeTree cannot be specified in modes except Collapsing or VersionedCollapsing.",
ErrorCodes::LOGICAL_ERROR);

if (!version_column.empty() && mode != MergingParams::Replacing && mode != MergingParams::VersionedCollapsing)
throw Exception("Version column for MergeTree cannot be specified in modes except Replacing or VersionedCollapsing.",
if (!version_column.empty() && mode != MergingParams::Replacing && mode != MergingParams::VersionedCollapsing && mode != MergingParams::ReplacingCollapsing)
throw Exception("Version column for MergeTree cannot be specified in modes except Replacing, ReplacingCollapsing or VersionedCollapsing.",
ErrorCodes::LOGICAL_ERROR);

if (!columns_to_sum.empty() && mode != MergingParams::Summing)
Expand Down Expand Up @@ -798,6 +799,12 @@ void MergeTreeData::MergingParams::check(const StorageInMemoryMetadata & metadat
if (mode == MergingParams::Replacing)
check_version_column(true, "ReplacingMergeTree");

if (mode == MergingParams::ReplacingCollapsing)
{
check_sign_column(false, "ReplacingCollapsingMergeTree");
check_version_column(false, "ReplacingCollapsingMergeTree");
}

if (mode == MergingParams::VersionedCollapsing)
{
check_sign_column(false, "VersionedCollapsingMergeTree");
Expand Down Expand Up @@ -913,13 +920,14 @@ String MergeTreeData::MergingParams::getModeName() const
{
switch (mode)
{
case Ordinary: return "";
case Collapsing: return "Collapsing";
case Summing: return "Summing";
case Aggregating: return "Aggregating";
case Replacing: return "Replacing";
case Graphite: return "Graphite";
case VersionedCollapsing: return "VersionedCollapsing";
case Ordinary: return "";
case Collapsing: return "Collapsing";
case Summing: return "Summing";
case Aggregating: return "Aggregating";
case Replacing: return "Replacing";
case ReplacingCollapsing: return "ReplacingCollapsing";
case Graphite: return "Graphite";
case VersionedCollapsing: return "VersionedCollapsing";
}

__builtin_unreachable();
Expand Down
3 changes: 2 additions & 1 deletion src/Storages/MergeTree/MergeTreeData.h
Original file line number Diff line number Diff line change
Expand Up @@ -344,6 +344,7 @@ class MergeTreeData : public IStorage, public WithMutableContext
Replacing = 5,
Graphite = 6,
VersionedCollapsing = 7,
ReplacingCollapsing = 8,
};

Mode mode;
Expand All @@ -354,7 +355,7 @@ class MergeTreeData : public IStorage, public WithMutableContext
/// For Summing mode. If empty - columns_to_sum is determined automatically.
Names columns_to_sum;

/// For Replacing and VersionedCollapsing mode. Can be empty for Replacing.
/// For Replacing, VersionedCollapsing and ReplacingCollapsing mode. Can be empty for Replacing.
String version_column;

/// For Graphite mode.
Expand Down
1 change: 1 addition & 0 deletions src/Storages/MergeTree/MergeTreeDataMergerMutator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
#include <Processors/Merges/CollapsingSortedTransform.h>
#include <Processors/Merges/SummingSortedTransform.h>
#include <Processors/Merges/ReplacingSortedTransform.h>
#include <Processors/Merges/ReplacingCollapsingSortedTransform.h>
#include <Processors/Merges/GraphiteRollupSortedTransform.h>
#include <Processors/Merges/AggregatingSortedTransform.h>
#include <Processors/Merges/VersionedCollapsingTransform.h>
Expand Down
4 changes: 4 additions & 0 deletions src/Storages/MergeTree/MergeTreeDataWriter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#include <Parsers/queryToString.h>

#include <Processors/Merges/Algorithms/ReplacingSortedAlgorithm.h>
#include <Processors/Merges/Algorithms/ReplacingCollapsingSortedAlgorithm.h>
#include <Processors/Merges/Algorithms/MergingSortedAlgorithm.h>
#include <Processors/Merges/Algorithms/CollapsingSortedAlgorithm.h>
#include <Processors/Merges/Algorithms/SummingSortedAlgorithm.h>
Expand Down Expand Up @@ -226,6 +227,9 @@ Block MergeTreeDataWriter::mergeBlock(
case MergeTreeData::MergingParams::Replacing:
return std::make_shared<ReplacingSortedAlgorithm>(
block, 1, sort_description, merging_params.version_column, block_size + 1);
case MergeTreeData::MergingParams::ReplacingCollapsing:
return std::make_shared<ReplacingCollapsingSortedAlgorithm>(
block, 1, sort_description, merging_params.sign_column, merging_params.version_column, block_size + 1);
case MergeTreeData::MergingParams::Collapsing:
return std::make_shared<CollapsingSortedAlgorithm>(
block, 1, sort_description, merging_params.sign_column,
Expand Down
27 changes: 26 additions & 1 deletion src/Storages/MergeTree/registerStorageMergeTree.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ static ColumnsDescription getColumnsDescriptionFromZookeeper(const String & raw_

static StoragePtr create(const StorageFactory::Arguments & args)
{
/** [Replicated][|Summing|VersionedCollapsing|Collapsing|Aggregating|Replacing|Graphite]MergeTree (2 * 7 combinations) engines
/** [Replicated][|Summing|VersionedCollapsing|Collapsing|Aggregating|Replacing|ReplacingCollapsing|Graphite]MergeTree (2 * 7 combinations) engines
* The argument for the engine should be:
* - (for Replicated) The path to the table in ZooKeeper
* - (for Replicated) Replica name in ZooKeeper
Expand All @@ -144,6 +144,7 @@ static StoragePtr create(const StorageFactory::Arguments & args)
* SummingMergeTree(date, [sample_key], primary_key, index_granularity, [columns_to_sum])
* AggregatingMergeTree(date, [sample_key], primary_key, index_granularity)
* ReplacingMergeTree(date, [sample_key], primary_key, index_granularity, [version_column])
* ReplacingCollapsingMergeTree(date, [sample_key], primary_key, index_granularity, sign, version_column)
* GraphiteMergeTree(date, [sample_key], primary_key, index_granularity, 'config_element')
*
* Alternatively, you can specify:
Expand Down Expand Up @@ -175,6 +176,8 @@ static StoragePtr create(const StorageFactory::Arguments & args)
merging_params.mode = MergeTreeData::MergingParams::Aggregating;
else if (name_part == "Replacing")
merging_params.mode = MergeTreeData::MergingParams::Replacing;
else if (name_part == "ReplacingCollapsing")
merging_params.mode = MergeTreeData::MergingParams::ReplacingCollapsing;
else if (name_part == "Graphite")
merging_params.mode = MergeTreeData::MergingParams::Graphite;
else if (name_part == "VersionedCollapsing")
Expand Down Expand Up @@ -236,6 +239,10 @@ static StoragePtr create(const StorageFactory::Arguments & args)
case MergeTreeData::MergingParams::Replacing:
add_optional_param("version");
break;
case MergeTreeData::MergingParams::ReplacingCollapsing:
add_mandatory_param("sign column");
add_mandatory_param("version");
break;
case MergeTreeData::MergingParams::Collapsing:
add_mandatory_param("sign column");
break;
Expand Down Expand Up @@ -455,6 +462,22 @@ static StoragePtr create(const StorageFactory::Arguments & args)
--arg_cnt;
}
}
else if (merging_params.mode == MergeTreeData::MergingParams::ReplacingCollapsing)
{
if (!tryGetIdentifierNameInto(engine_args[arg_cnt - 1], merging_params.version_column))
throw Exception(
"Version column name must be an unquoted string" + getMergeTreeVerboseHelp(is_extended_storage_def),
ErrorCodes::BAD_ARGUMENTS);

--arg_cnt;

if (!tryGetIdentifierNameInto(engine_args[arg_cnt - 1], merging_params.sign_column))
throw Exception(
"Sign column name must be an unquoted string" + getMergeTreeVerboseHelp(is_extended_storage_def),
ErrorCodes::BAD_ARGUMENTS);

--arg_cnt;
}
else if (merging_params.mode == MergeTreeData::MergingParams::Summing)
{
/// If the last element is not index_granularity or replica_name (a literal), then this is a list of summable columns.
Expand Down Expand Up @@ -734,6 +757,7 @@ void registerStorageMergeTree(StorageFactory & factory)
factory.registerStorage("MergeTree", create, features);
factory.registerStorage("CollapsingMergeTree", create, features);
factory.registerStorage("ReplacingMergeTree", create, features);
factory.registerStorage("ReplacingCollapsingMergeTree", create, features);
factory.registerStorage("AggregatingMergeTree", create, features);
factory.registerStorage("SummingMergeTree", create, features);
factory.registerStorage("GraphiteMergeTree", create, features);
Expand All @@ -746,6 +770,7 @@ void registerStorageMergeTree(StorageFactory & factory)
factory.registerStorage("ReplicatedMergeTree", create, features);
factory.registerStorage("ReplicatedCollapsingMergeTree", create, features);
factory.registerStorage("ReplicatedReplacingMergeTree", create, features);
factory.registerStorage("ReplicatedReplacingCollapsingMergeTree", create, features);
factory.registerStorage("ReplicatedAggregatingMergeTree", create, features);
factory.registerStorage("ReplicatedSummingMergeTree", create, features);
factory.registerStorage("ReplicatedGraphiteMergeTree", create, features);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
d1 1 5

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we have 3 times the same d1?
Why do we have 3 times d2, with different values? etc

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This "reference" file is the expected output of the {same name}.sql
If we look at the sql file, we will see 3 calls to select * from test; that's why. The three output are following without blank/newline in-between.
So you should have 2 times:

d1	1	5
d2	1	1
d3	1	1
d4	1	3
d5	1	1
d6	-1	2

and the last insert add overlapping data with new ones, so the output is a little different with

d1	1	5
d2	1	3
d3	1	3
d4	1	3
d5	1	1
d6	-1	2

new versions of d2 and d3 are in this new batch.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh I somehow missed that there're 3 SELECT calls 👍

d2 1 1
d3 1 1
d4 1 3
d5 1 1
d6 -1 2
d1 1 5
d2 1 1
d3 1 1
d4 1 3
d5 1 1
d6 -1 2
d1 1 5
d2 1 3
d3 1 3
d4 1 3
d5 1 1
d6 -1 2
12 changes: 12 additions & 0 deletions tests/queries/0_stateless/02391_ReplacingCollapsingMergeTree.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
CREATE TABLE test (uid String, sign Int8, version UInt32) ENGINE = ReplacingCollapsingMergeTree(sign, version) Order by (uid);

INSERT INTO test (*) VALUES ('d1', 1, 1), ('d2', 1, 1), ('d6', 1, 1), ('d4', 1, 1), ('d6', -1, 2), ('d3', 1, 1), ('d1', -1, 2), ('d5', 1, 1), ('d4', -1, 2), ('d1', 1, 3), ('d1', -1, 4), ('d4', 1, 3), ('d1', 1, 5);
Copy link

@gontarzpawel gontarzpawel Aug 17, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we shuffle the input values so that we have:

('d1', 1, 3),('d1', -1, 2),('d1', 1, 1)

?

Copy link
Author

@youennL-cs youennL-cs Aug 17, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can, I just want to check that if we insert values in "random" order, it doesn't affect the output.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It can still be random, I was just curious about sorting (but it has to work otherwise next tests wouldn't work)

select * from test FINAL;

-- Test inserting Backup
INSERT INTO test (*) VALUES ('d6', 1, 1), ('d4', 1, 1), ('d6', -1, 2), ('d3', 1, 1), ('d1', -1, 2), ('d5', 1, 1), ('d4', -1, 2);
select * from test FINAL;

-- test insert second batch with overlaping data
INSERT INTO test (*) VALUES ('d4', 1, 1), ('d6', -1, 2), ('d3', 1, 1), ('d1', -1, 2), ('d5', 1, 1), ('d4', -1, 2), ('d1', 1, 3), ('d1', -1, 4), ('d4', 1, 3), ('d1', 1, 5), ('d2', -1, 2), ('d2', 1, 3), ('d3', -1, 2), ('d3', 1, 3);
select * from test FINAL;