Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Addresses issues with generated models being overly eager to refresh #2406

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion warehouse/metrics_tools/dialect/translate.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ def names(self):
return list(self._map.keys())


def send_anonymous_to_callable(anon: exp.Anonymous, f: t.Callable):
def send_anonymous_to_callable[T](anon: exp.Anonymous, f: t.Callable[..., T]):
# much of this is taken from sqlmesh.core.macros
args = []
kwargs = {}
Expand Down
30 changes: 27 additions & 3 deletions warehouse/metrics_tools/lib/factories/definition.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,10 @@
from sqlmesh.core.macros import MacroEvaluator
from sqlmesh.utils.date import TimeLike

from metrics_tools.dialect.translate import CustomFuncHandler, CustomFuncRegistry
from metrics_tools.dialect.translate import (
CustomFuncHandler,
CustomFuncRegistry,
)
from metrics_tools.evaluator import FunctionsTransformer

CURR_DIR = os.path.dirname(__file__)
Expand Down Expand Up @@ -377,6 +380,26 @@ def table_name(self, ref: PeerMetricDependencyRef):
name = self._source.name or self._name
return reference_to_str(ref, name)

def dependencies(
self, ref: PeerMetricDependencyRef, peer_table_map: t.Dict[str, str]
):
dependencies: t.Set[str] = set()

for expression in self._expressions:
anonymous_expressions = expression.find_all(exp.Anonymous)
for anonymous in anonymous_expressions:
if anonymous.this == "metrics_peer_ref":
dep_name = anonymous.expressions[0].sql()
dependencies = dependencies.union(
set(
filter(
lambda a: dep_name in a,
peer_table_map.keys(),
)
)
)
return list(dependencies)

def generate_dependency_refs_for_name(self, name: str):
refs: t.List[PeerMetricDependencyRef] = []
for entity in self._source.entity_types or DEFAULT_ENTITY_TYPES:
Expand Down Expand Up @@ -808,7 +831,7 @@ class GeneratedArtifactConfig(t.TypedDict):
query_reference_name: str
query_def_as_input: MetricQueryInput
default_dialect: str
peer_table_map: t.Dict[str, str]
peer_table_tuples: t.List[t.Tuple[str, str]]
ref: PeerMetricDependencyRef
timeseries_sources: t.List[str]

Expand All @@ -818,7 +841,7 @@ def generated_entity(
query_reference_name: str,
query_def_as_input: MetricQueryInput,
default_dialect: str,
peer_table_map: t.Dict[str, str],
peer_table_tuples: t.List[t.Tuple[str, str]],
ref: PeerMetricDependencyRef,
timeseries_sources: t.List[str],
):
Expand All @@ -828,6 +851,7 @@ def generated_entity(
default_dialect=default_dialect,
source=query_def,
)
peer_table_map = dict(peer_table_tuples)
e = query.generate_query_ref(
ref,
evaluator,
Expand Down
20 changes: 18 additions & 2 deletions warehouse/metrics_tools/lib/factories/factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,11 +96,21 @@ def generate_models_from_query(
assert query._source.rolling
cron = query._source.rolling["cron"]

# Clean up the peer_table_map (this is a hack to prevent unnecessary
# runs when the metrics factory is updated)
query_dependencies = query.dependencies(ref, peer_table_map)
# So much of this needs to be refactored but for now this is to ensure
# that in some way that the dict doesn't randomly "change". I don't
# think this will be consistent between python machines but let's see
# for now.
reduced_peer_table_tuples = [(k, peer_table_map[k]) for k in query_dependencies]
reduced_peer_table_tuples.sort()

config = GeneratedArtifactConfig(
query_reference_name=query_reference_name,
query_def_as_input=query_def_as_input,
default_dialect=default_dialect,
peer_table_map=peer_table_map,
peer_table_tuples=reduced_peer_table_tuples,
ref=ref,
timeseries_sources=timeseries_sources,
)
Expand Down Expand Up @@ -273,5 +283,11 @@ def timeseries_metrics(
kind="VIEW",
dialect="clickhouse",
start=raw_options["start"],
columns=METRICS_COLUMNS_BY_ENTITY[entity_type],
columns={
k: METRICS_COLUMNS_BY_ENTITY[entity_type][k]
for k in filter(
lambda col: col not in ["event_source"],
METRICS_COLUMNS_BY_ENTITY[entity_type].keys(),
)
},
)
1 change: 1 addition & 0 deletions warehouse/metrics_tools/lib/local/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

project_id = os.getenv("GOOGLE_PROJECT_ID")


def bq_to_duckdb(table_mapping: t.Dict[str, str], duckdb_path: str):
"""Copies the tables in table_mapping to tables in duckdb

Expand Down
Loading