diff --git a/autoagora/config.py b/autoagora/config.py index af576b2..5a7051b 100644 --- a/autoagora/config.py +++ b/autoagora/config.py @@ -189,6 +189,22 @@ def init_config(argv: Optional[Sequence[str]] = None): default=3600, help="(Seconds) Interval between rebuilds of the relative query costs models.", ) + argparser_relative_query_costs.add_argument( + "--graph-node-query-endpoint", + env_var="GRAPH_NODE_QUERY_ENDPOINT", + required=False, + help="graph-node query endpoint. Used to re-execute queries.", + ) + argparser_relative_query_costs.add_argument( + "--multi-root-queries", + env_var="MULTI_ROOT_QUERIES", + required=False, + type=bool, + default=False, + help="Enables the multi-root query evaluator: runs randomly selected query " + "roots from logged multi-root queries, to build a statistical model of their " + "execution time.", + ) # # Manual agora entry values # diff --git a/autoagora/graph_node_utils.py b/autoagora/graph_node_utils.py new file mode 100644 index 0000000..263b3e7 --- /dev/null +++ b/autoagora/graph_node_utils.py @@ -0,0 +1,24 @@ +# Copyright 2022-, Semiotic AI, Inc. +# SPDX-License-Identifier: Apache-2.0 + +import logging +from typing import Mapping, Optional + +import aiohttp +import backoff +from gql import Client, gql +from gql.transport.aiohttp import AIOHTTPTransport + +from autoagora.config import args + + +@backoff.on_exception( + backoff.expo, aiohttp.ClientError, max_time=30, logger=logging.root +) +async def query_graph_node(query: str, variables: Optional[Mapping] = None): + async with Client( + transport=AIOHTTPTransport(args.graph_node_query_endpoint), + fetch_schema_from_transport=False, + ) as session: + result = await session.execute(gql(query), variable_values=variables) # type: ignore + return result diff --git a/autoagora/logs_db.py b/autoagora/logs_db.py index d276e74..e8b9063 100644 --- a/autoagora/logs_db.py +++ b/autoagora/logs_db.py @@ -1,6 +1,8 @@ # Copyright 2022-, Semiotic AI, Inc. # SPDX-License-Identifier: Apache-2.0 +import json from dataclasses import dataclass +from datetime import datetime import graphql import psycopg_pool @@ -17,8 +19,16 @@ class QueryStats: avg_time: float stddev_time: float + @dataclass + class MRQ_Info: + hash: bytes + query: str + timestamp: datetime + query_time_ms: int = 0 + def __init__(self, pgpool: psycopg_pool.AsyncConnectionPool) -> None: self.pgpool = pgpool + self.timestamp = datetime.now() def return_query_body(self, query): # Keep only query body -- ie. no var defs @@ -28,7 +38,22 @@ def return_query_body(self, query): query = "query " + graphql.print_ast(query) return query - async def get_most_frequent_queries( + async def create_mrq_log_table(self) -> None: + async with self.pgpool.connection() as connection: + await connection.execute( + """ + CREATE TABLE IF NOT EXISTS mrq_query_logs ( + id uuid PRIMARY KEY DEFAULT gen_random_uuid(), + subgraph char(46) NOT NULL, + query_hash bytea REFERENCES query_skeletons(hash), + timestamp timestamptz NOT NULL, + query_time_ms integer, + query_variables text + ) + """ + ) + + async def get_most_frequent_queries_null_time( self, subgraph_ipfs_hash: str, min_count: int = 100 ): @@ -36,40 +61,150 @@ async def get_most_frequent_queries( rows = await connection.execute( sql.SQL( """ - SELECT - query, - count_id, - min_time, - max_time, - avg_time, - stddev_time - FROM - query_skeletons - INNER JOIN - ( - SELECT - query_hash as qhash, - count(id) as count_id, - Min(query_time_ms) as min_time, - Max(query_time_ms) as max_time, - Avg(query_time_ms) as avg_time, - Stddev(query_time_ms) as stddev_time - FROM - query_logs - WHERE - subgraph = {hash} - AND query_time_ms IS NOT NULL - GROUP BY - qhash - HAVING - Count(id) >= {min_count} - ) as query_logs - ON - qhash = hash - ORDER BY - count_id DESC + SELECT + hash, + query + + FROM + query_skeletons + INNER JOIN + ( + SELECT + query_hash as qhash, + count(id) as count_id + FROM + query_logs + WHERE + subgraph = {subgraph} + AND query_time_ms IS NULL + GROUP BY + qhash + HAVING + Count(id) >= {min_count} + ) as query_logs + ON + qhash = hash + ORDER BY + count_id DESC + """ + ).format(subgraph=subgraph_ipfs_hash, min_count=min_count) + ) + rows = await rows.fetchall() + return [ + LogsDB.MRQ_Info( + hash=row[0], + query=self.return_query_body(row[1]) + if self.return_query_body(row[1]) + else "null", + timestamp=self.timestamp, + ) + for row in rows + ] + + async def get_query_logs_id(self, hash): + async with self.pgpool.connection() as connection: + query_logs_ids = await connection.execute( + sql.SQL( + """ + SELECT + id + FROM + query_logs + WHERE + query_hash = {query_hash}; + """ + ).format(query_hash=hash) + ) + query_logs_ids = await query_logs_ids.fetchall() + return query_logs_ids + + async def get_query_variables(self, id: bytes): + async with self.pgpool.connection() as connection: + query_variables = await connection.execute( + sql.SQL( + """ + SELECT + query_variables + FROM + query_logs + WHERE + id = {id}; + """ + ).format(id=id) + ) + query_variables = await query_variables.fetchall() + return json.loads(query_variables[0][0]) + + async def save_generated_aa_query_values( + self, query: MRQ_Info, subgraph: str, query_variables: list + ): + async with self.pgpool.connection() as connection: + await connection.execute( + sql.SQL( + """ + INSERT INTO mrq_query_logs ( + subgraph, + query_hash, + timestamp, + query_time_ms, + query_variables + ) + VALUES ({subgraph}, {query_hash}, {timestamp}, {query_time}, {query_vars}) + """ + ).format( + subgraph=subgraph, + query_hash=query.hash, + timestamp=query.timestamp, + query_time=query.query_time_ms, + query_vars=json.dumps(query_variables), + ) + ) + + async def get_most_frequent_queries( + self, subgraph_ipfs_hash: str, table: str, min_count: int = 100 + ): + async with self.pgpool.connection() as connection: + rows = await connection.execute( + sql.SQL( + """ + SELECT + query, + count_id, + min_time, + max_time, + avg_time, + stddev_time + FROM + query_skeletons + INNER JOIN + ( + SELECT + query_hash as qhash, + count(id) as count_id, + Min(query_time_ms) as min_time, + Max(query_time_ms) as max_time, + Avg(query_time_ms) as avg_time, + Stddev(query_time_ms) as stddev_time + FROM + {table} + WHERE + subgraph = {hash} + AND query_time_ms IS NOT NULL + GROUP BY + qhash + HAVING + Count(id) >= {min_count} + ) as query_logs + ON + qhash = hash + ORDER BY + count_id DESC """ - ).format(hash=subgraph_ipfs_hash, min_count=str(min_count)), + ).format( + table=sql.Identifier(table), + hash=subgraph_ipfs_hash, + min_count=str(min_count), + ) ) rows = await rows.fetchall() return [ diff --git a/autoagora/main.py b/autoagora/main.py index 2b51c2a..6f225f8 100644 --- a/autoagora/main.py +++ b/autoagora/main.py @@ -11,7 +11,12 @@ from autoagora.config import args, init_config from autoagora.indexer_utils import get_allocated_subgraphs, set_cost_model -from autoagora.model_builder import apply_default_model, model_update_loop +from autoagora.logs_db import LogsDB +from autoagora.model_builder import ( + apply_default_model, + model_update_loop, + mrq_model_update_loop, +) from autoagora.price_multiplier import price_bandit_loop from autoagora.query_metrics import ( K8SServiceWatcherMetricsEndpoints, @@ -24,6 +29,7 @@ class SubgraphUpdateLoops: bandit: Optional[aio.Future] = None model: Optional[aio.Future] = None + mrq_model: Optional[aio.Future] = None def __del__(self): for future in [self.bandit, self.model]: @@ -59,6 +65,10 @@ async def allocated_subgraph_watcher(): ) raise + # Initialize the extra table + logsDB = LogsDB(pgpool) + await logsDB.create_mrq_log_table() + # Initialize indexer-service metrics endpoints if args.indexer_service_metrics_endpoint: # static list metrics_endpoints = StaticMetricsEndpoints( @@ -99,6 +109,16 @@ async def allocated_subgraph_watcher(): logging.info( "Added model update loop for subgraph %s", new_subgraph ) + if args.multi_root_queries: + # Add the multi root queries (mrq) + update_loops[new_subgraph].mrq_model = aio.ensure_future( + mrq_model_update_loop( + new_subgraph, pgpool + ) # Last parameter as true to enable mrq + ) + logging.info( + "Added model update loop for subgraph %s", new_subgraph + ) # Launch the price multiplier update loop for the new subgraph update_loops[new_subgraph].bandit = aio.ensure_future( diff --git a/autoagora/model_builder.py b/autoagora/model_builder.py index 37d1676..47942a1 100644 --- a/autoagora/model_builder.py +++ b/autoagora/model_builder.py @@ -4,20 +4,26 @@ import asyncio as aio import logging import os +import random +import time +from datetime import datetime from importlib.metadata import version import psycopg_pool from jinja2 import Template from autoagora.config import args +from autoagora.graph_node_utils import query_graph_node from autoagora.indexer_utils import set_cost_model from autoagora.logs_db import LogsDB -from autoagora.utils.constants import AGORA_ENTRY_TEMPLATE +from autoagora.utils.constants import AGORA_ENTRY_TEMPLATE, MU, SIGMA async def model_builder(subgraph: str, pgpool: psycopg_pool.AsyncConnectionPool) -> str: logs_db = LogsDB(pgpool) - most_frequent_queries = await logs_db.get_most_frequent_queries(subgraph) + most_frequent_queries = await logs_db.get_most_frequent_queries( + subgraph, table="query_logs" + ) model = build_template(subgraph, most_frequent_queries) return model @@ -27,11 +33,63 @@ async def apply_default_model(subgraph: str): await set_cost_model(subgraph, model) +# (mrq) stands for multi root query +async def mrq_model_builder( + subgraph: str, pgpool: psycopg_pool.AsyncConnectionPool +) -> str: + logs_db = LogsDB(pgpool) + # Obtain most queried mrq + most_frequent_multi_root_queries = ( + await logs_db.get_most_frequent_queries_null_time(subgraph) + ) + for mrq_info in most_frequent_multi_root_queries: + await measure_query_time(subgraph, mrq_info, logs_db) + # Call tables with info created + most_frequent_queries = await logs_db.get_most_frequent_queries( + subgraph, table="mrq_query_logs" + ) + model = build_template(subgraph, most_frequent_queries) + return model + + +# Obtains the execution time for n amount of random queries +async def measure_query_time( + subgraph: str, + multi_root_query_info: LogsDB.MRQ_Info, + logs_db: LogsDB, + iterations: int = 100, +): + # Call db to obtain related variable lists + query_log_ids = await logs_db.get_query_logs_id(multi_root_query_info.hash) + for _ in range(iterations): + + query_id = random.choice(query_log_ids)[0] + query_variables = await logs_db.get_query_variables(query_id) + query_variables_dict = {f"_{i}": var for i, var in enumerate(query_variables)} + start_q_execution = time.monotonic() + await query_graph_node(multi_root_query_info.query, query_variables_dict) + multi_root_query_info.query_time_ms = int( + (time.monotonic() - start_q_execution) * 1000 + ) + multi_root_query_info.timestamp = datetime.now() + await logs_db.save_generated_aa_query_values( + multi_root_query_info, subgraph, query_variables + ) + + +async def mrq_model_update_loop(subgraph: str, pgpool): + while True: + model = await mrq_model_builder(subgraph, pgpool) + await set_cost_model(subgraph, model) + await aio.sleep(args.relative_query_costs_refresh_interval) + + async def model_update_loop(subgraph: str, pgpool): while True: model = await model_builder(subgraph, pgpool) await set_cost_model(subgraph, model) - await aio.sleep(args.relative_query_costs_refresh_interval) + # TODO: apply here lognormvariate , need to find a value that works + await aio.sleep(random.lognormvariate(MU, SIGMA)) def build_template(subgraph: str, most_frequent_queries=None): diff --git a/autoagora/utils/constants.py b/autoagora/utils/constants.py index a2944c7..b5f5e11 100644 --- a/autoagora/utils/constants.py +++ b/autoagora/utils/constants.py @@ -3,6 +3,9 @@ AGORA_DEFAULT_COST_MODEL = "default => $DEFAULT_COST * $GLOBAL_COST_MULTIPLIER;" +MU = 0.4 +SIGMA = 0.2 + AGORA_ENTRY_TEMPLATE = """\ # Generated by AutoAgora {{aa_version}} @@ -20,3 +23,72 @@ {% endfor %} default => $DEFAULT_COST * $GLOBAL_COST_MULTIPLIER;\ """ +GET_MFQ_QUERY_LOGS = """\ +SELECT + query, + count_id, + min_time, + max_time, + avg_time, + stddev_time +FROM + query_skeletons +INNER JOIN +( + SELECT + query_hash as qhash, + count(id) as count_id, + Min(query_time_ms) as min_time, + Max(query_time_ms) as max_time, + Avg(query_time_ms) as avg_time, + Stddev(query_time_ms) as stddev_time + FROM + query_logs + WHERE + subgraph = $1 + AND query_time_ms IS NOT NULL + GROUP BY + qhash + HAVING + Count(id) >= $2 +) as query_logs +ON + qhash = hash +ORDER BY + count_id DESC +""" + +GET_MFQ_MRQ_LOGS = """\ +SELECT + query, + count_id, + min_time, + max_time, + avg_time, + stddev_time +FROM + query_skeletons +INNER JOIN +( + SELECT + query_hash as qhash, + count(id) as count_id, + Min(query_time_ms) as min_time, + Max(query_time_ms) as max_time, + Avg(query_time_ms) as avg_time, + Stddev(query_time_ms) as stddev_time + FROM + mrq_query_logs + WHERE + subgraph = $1 + AND query_time_ms IS NOT NULL + GROUP BY + qhash + HAVING + Count(id) >= $2 +) as query_logs +ON + qhash = hash +ORDER BY + count_id DESC +""" diff --git a/tests/test_logs_db.py b/tests/test_logs_db.py index 955998b..fd95c67 100644 --- a/tests/test_logs_db.py +++ b/tests/test_logs_db.py @@ -62,7 +62,7 @@ async def pgpool(self, postgresql): async def test_get_most_frequent_queries_success(self, pgpool): ldb = LogsDB(pgpool) mfq = await ldb.get_most_frequent_queries( - "QmPnu3R7Fm4RmBF21aCYUohDmWbKd3VMXo64ACiRtwUQrn", 2 + "QmPnu3R7Fm4RmBF21aCYUohDmWbKd3VMXo64ACiRtwUQrn", "query_logs", 2 ) assert mfq query = "query {\n values {\n id\n }\n}" @@ -84,7 +84,7 @@ async def test_get_most_frequent_queries_success(self, pgpool): async def test_get_most_frequent_queries_failed(self, pgpool): ldb = LogsDB(pgpool) mfq = await ldb.get_most_frequent_queries( - "QmTJBvvpknMow6n4YU8R9Swna6N8mHK8N2WufetysBiyuL" + "QmTJBvvpknMow6n4YU8R9Swna6N8mHK8N2WufetysBiyuL", "query_logs" ) # empty array will be returned since min is default to 100 assert mfq == [] diff --git a/tests/test_main.py b/tests/test_main.py index 8e5a4bf..1e2eb9e 100644 --- a/tests/test_main.py +++ b/tests/test_main.py @@ -13,54 +13,58 @@ async def test_allocated_subgraph_watcher(self, postgresql): subgraph1 = "QmPnu3R7Fm4RmBF21aCYUohDmWbKd3VMXo64ACiRtwUQrn" subgraph2 = "QmTJBvvpknMow6n4YU8R9Swna6N8mHK8N2WufetysBiyuL" with mock.patch( - "autoagora.main.get_allocated_subgraphs" - ) as mock_get_allocated_subgraphs: - with mock.patch("autoagora.main.set_cost_model") as mock_set_cost_model: - with mock.patch( - "autoagora.main.apply_default_model" - ) as mock_apply_default_model: # pyright: ignore[reportUnusedVariable] + "autoagora.logs_db.LogsDB.create_mrq_log_table" + ) as mock_create_mrq_log_table: + with mock.patch( + "autoagora.main.get_allocated_subgraphs" + ) as mock_get_allocated_subgraphs: + with mock.patch("autoagora.main.set_cost_model") as mock_set_cost_model: with mock.patch( - "autoagora.main.model_update_loop" - ) as mock_model_update_loop: + "autoagora.main.apply_default_model" + ) as mock_apply_default_model: # pyright: ignore[reportUnusedVariable] with mock.patch( - "autoagora.main.price_bandit_loop" - ) as mock_price_bandit_loop: - init_config( - [ - "--indexer-agent-mgmt-endpoint", - "http://nowhere", - "--postgres-host", - postgresql.info.host, - "--postgres-username", - postgresql.info.user, - "--postgres-password", - postgresql.info.password, - "--postgres-port", - str(postgresql.info.port), - "--postgres-database", - postgresql.info.dbname, - "--indexer-service-metrics-endpoint", - "http://indexer-service.default.svc.cluster.local:7300/metrics", - ] - ) - mock_get_allocated_subgraphs.return_value = { - subgraph1, - subgraph2, - } + "autoagora.main.model_update_loop" + ) as mock_model_update_loop: + with mock.patch( + "autoagora.main.price_bandit_loop" + ) as mock_price_bandit_loop: + init_config( + [ + "--indexer-agent-mgmt-endpoint", + "http://nowhere", + "--postgres-host", + postgresql.info.host, + "--postgres-username", + postgresql.info.user, + "--postgres-password", + postgresql.info.password, + "--postgres-port", + str(postgresql.info.port), + "--postgres-database", + postgresql.info.dbname, + "--indexer-service-metrics-endpoint", + "http://indexer-service.default.svc.cluster.local:7300/metrics", + ] + ) + mock_create_mrq_log_table.return_value = None + mock_get_allocated_subgraphs.return_value = { + subgraph1, + subgraph2, + } - task = asyncio.create_task(allocated_subgraph_watcher()) - await asyncio.sleep(2) - task.cancel() + task = asyncio.create_task(allocated_subgraph_watcher()) + await asyncio.sleep(2) + task.cancel() - mock_get_allocated_subgraphs.assert_called_once() - mock_set_cost_model.assert_any_call( - subgraph1, - variables=DEFAULT_AGORA_VARIABLES, - ) - mock_set_cost_model.assert_any_call( - subgraph2, - variables=DEFAULT_AGORA_VARIABLES, - ) - # Since there is no args for relative query cost the update_loop wont be called - assert mock_model_update_loop.call_count == 0 - mock_price_bandit_loop.assert_called() + mock_get_allocated_subgraphs.assert_called_once() + mock_set_cost_model.assert_any_call( + subgraph1, + variables=DEFAULT_AGORA_VARIABLES, + ) + mock_set_cost_model.assert_any_call( + subgraph2, + variables=DEFAULT_AGORA_VARIABLES, + ) + # Since there is no args for relative query cost the update_loop wont be called + assert mock_model_update_loop.call_count == 0 + mock_price_bandit_loop.assert_called() diff --git a/tests/test_model_builder.py b/tests/test_model_builder.py index 9f8046a..e56bee7 100644 --- a/tests/test_model_builder.py +++ b/tests/test_model_builder.py @@ -1,15 +1,107 @@ +import asyncio +import json import os +import random import re import tempfile +from datetime import datetime +from typing import Mapping, Optional from unittest import mock +import psycopg_pool +import pytest +from psycopg import sql + from autoagora.config import init_config from autoagora.logs_db import LogsDB -from autoagora.model_builder import apply_default_model, build_template +from autoagora.model_builder import ( + apply_default_model, + build_template, + mrq_model_builder, +) from tests.utils.constants import TEST_MANUAL_AGORA_ENTRY, TEST_QUERY_1, TEST_QUERY_2 +async def wait_random_time(query: str, variables: Optional[Mapping] = None): + await asyncio.sleep(random.lognormvariate(-4, 0.3)) + + class TestModelBuilder: + @pytest.fixture + async def pgpool(self, postgresql): + conn_string = ( + f"host={postgresql.info.host} " + f"dbname={postgresql.info.dbname} " + f"user={postgresql.info.user} " + f'password="{postgresql.info.password}" ' + f"port={postgresql.info.port}" + ) + + pool = psycopg_pool.AsyncConnectionPool( + conn_string, min_size=2, max_size=10, open=False + ) + await pool.open() + await pool.wait() + async with pool.connection() as conn: + await conn.execute( + """ + CREATE TABLE query_skeletons ( + hash BYTEA PRIMARY KEY, + query TEXT NOT NULL + ) + """ + ) + await conn.execute( + """ + CREATE TABLE query_logs ( + id UUID PRIMARY KEY DEFAULT gen_random_uuid(), + query_hash BYTEA REFERENCES query_skeletons(hash), + subgraph CHAR(46) NOT NULL, + timestamp TIMESTAMPTZ NOT NULL, + query_time_ms INTEGER, + query_variables TEXT + + ) + """ + ) + await conn.execute( + """ + CREATE TABLE IF NOT EXISTS mrq_query_logs ( + id uuid PRIMARY KEY DEFAULT gen_random_uuid(), + subgraph char(46) NOT NULL, + query_hash bytea REFERENCES query_skeletons(hash), + timestamp timestamptz NOT NULL, + query_time_ms integer, + query_variables text + ) + """ + ) + await conn.execute( + """ + INSERT INTO query_skeletons (hash, query) + VALUES ('hash1', 'query( $_0: string ){ info( id: $_1 ){ stat val }}'), + ('hash2', 'query( $_0: string ){ info( id: $_1 ){ stat val }}') + """ + ) + await conn.execute( + sql.SQL( + """ + INSERT INTO query_logs (query_hash, subgraph, timestamp, query_variables) + VALUES ('hash1', 'QmPnu3R7Fm4RmBF21aCYUohDmWbKd3VMXo64ACiRtwUQrn', '2023-05-18T21:47:41+00:00', {mock1}), + ('hash1', 'QmPnu3R7Fm4RmBF21aCYUohDmWbKd3VMXo64ACiRtwUQrn', '2023-05-18T21:47:41+00:00', {mock2}), + ('hash2', 'QmTJBvvpknMow6n4YU8R9Swna6N8mHK8N2WufetysBiyuL', '2023-05-18T21:47:41+00:00', {mock3}), + ('hash1', 'QmTJBvvpknMow6n4YU8R9Swna6N8mHK8N2WufetysBiyuL', '2023-05-18T21:47:41+00:00', {mock4}) + """ + ).format( + mock1=json.dumps(["string_to_insert1", "mock_id1"]), + mock2=json.dumps(["string_to_insert2", "mock_id2"]), + mock3=json.dumps(["string_to_insert3", "mock_id3"]), + mock4=json.dumps(["string_to_insert4", "mock_id4"]), + ) + ) + yield pool + await pool.close() + async def test_build_model(self, postgresql): init_config( [ @@ -112,6 +204,63 @@ async def test_build_model_with_manual_entry(self, postgresql): assert TEST_QUERY_2 in model assert TEST_MANUAL_AGORA_ENTRY in model + async def test_build_mrq_model(self, pgpool, postgresql): + with mock.patch( + "autoagora.logs_db.LogsDB.get_most_frequent_queries_null_time" + ) as mock_get__mrq_mfq: + with mock.patch( + "autoagora.model_builder.query_graph_node" + ) as mock_query_graph_node: + mock_get__mrq_mfq.return_value = [ + LogsDB.MRQ_Info( + hash=b"hash1", + query="query( $_0: string ){ info( id: $_1 ){ stat val }}", + timestamp=datetime.now(), + ) + ] + mock_query_graph_node.side_effect = wait_random_time + init_config( + [ + "--indexer-agent-mgmt-endpoint", + "http://nowhere", + "--postgres-host", + postgresql.info.host, + "--postgres-username", + postgresql.info.user, + "--postgres-password", + postgresql.info.password, + "--postgres-port", + str(postgresql.info.port), + "--postgres-database", + postgresql.info.dbname, + "--indexer-service-metrics-endpoint", + "http://indexer-service.default.svc.cluster.local:7300/metrics", + ] + ) + model = await mrq_model_builder( + "QmPnu3R7Fm4RmBF21aCYUohDmWbKd3VMXo64ACiRtwUQrn", pgpool + ) + + pattern = r"# Generated by AutoAgora \d+\.\d+\.\d+" + # To ensure a version is being obtained + assert re.match( + pattern, model + ), f"{model} does not match pattern {pattern}" + # assert model == 1 + async with pgpool.connection() as connection: + mrq_data = await connection.execute( + sql.SQL( + """ + SELECT + * + FROM + mrq_query_logs + """ + ) + ) + mrq_data = await mrq_data.fetchall() + assert mrq_data + async def test_apply_default_model(self, postgresql): subgraph = "Qmadj8x9km1YEyKmRnJ6EkC2zpJZFCfTyTZpuqC3j6e1QH" file_type = ".agora" diff --git a/tests/test_mrq.py b/tests/test_mrq.py new file mode 100644 index 0000000..17f7bc8 --- /dev/null +++ b/tests/test_mrq.py @@ -0,0 +1,208 @@ +import asyncio as aio +import json +import random +import re +from datetime import datetime +from unittest import mock + +import psycopg_pool +import pytest +from psycopg import sql + +from autoagora.config import init_config +from autoagora.logs_db import LogsDB +from autoagora.model_builder import measure_query_time, mrq_model_builder +from autoagora.utils.constants import MU, SIGMA + + +async def query_graph_node_mock_side(*args, **kwargs): + await aio.sleep(random.lognormvariate(MU, SIGMA)) + return mock.MagicMock() + + +class TestMRQ: + @pytest.fixture + async def pgpool(self, postgresql): + conn_string = ( + f"host={postgresql.info.host} " + f"dbname={postgresql.info.dbname} " + f"user={postgresql.info.user} " + f'password="{postgresql.info.password}" ' + f"port={postgresql.info.port}" + ) + + pool = psycopg_pool.AsyncConnectionPool( + conn_string, min_size=2, max_size=10, open=False + ) + await pool.open() + await pool.wait() + async with pool.connection() as conn: + await conn.execute( + """ + CREATE TABLE query_skeletons ( + hash BYTEA PRIMARY KEY, + query TEXT NOT NULL + ) + """ + ) + await conn.execute( + """ + CREATE TABLE query_logs ( + id UUID PRIMARY KEY DEFAULT gen_random_uuid(), + query_hash BYTEA REFERENCES query_skeletons(hash), + subgraph CHAR(46) NOT NULL, + timestamp TIMESTAMPTZ NOT NULL, + query_time_ms INTEGER, + query_variables TEXT + + ) + """ + ) + await conn.execute( + """ + CREATE TABLE IF NOT EXISTS mrq_query_logs ( + id uuid PRIMARY KEY DEFAULT gen_random_uuid(), + subgraph char(46) NOT NULL, + query_hash bytea REFERENCES query_skeletons(hash), + timestamp timestamptz NOT NULL, + query_time_ms integer, + query_variables text + ) + """ + ) + await conn.execute( + """ + INSERT INTO query_skeletons (hash, query) + VALUES ('hash1', 'query( $_0: string ){ info( id: $_1 ){ stat val }}'), + ('hash2', 'query( $_0: string ){ info( id: $_1 ){ stat val }}') + """ + ) + await conn.execute( + sql.SQL( + """ + INSERT INTO query_logs (query_hash, subgraph, timestamp, query_variables) + VALUES ('hash1', 'QmPnu3R7Fm4RmBF21aCYUohDmWbKd3VMXo64ACiRtwUQrn', '2023-05-18T21:47:41+00:00', {mock1}), + ('hash1', 'QmPnu3R7Fm4RmBF21aCYUohDmWbKd3VMXo64ACiRtwUQrn', '2023-05-18T21:47:41+00:00', {mock2}), + ('hash2', 'QmTJBvvpknMow6n4YU8R9Swna6N8mHK8N2WufetysBiyuL', '2023-05-18T21:47:41+00:00', {mock3}), + ('hash1', 'QmTJBvvpknMow6n4YU8R9Swna6N8mHK8N2WufetysBiyuL', '2023-05-18T21:47:41+00:00', {mock4}) + """ + ).format( + mock1=json.dumps(["string_to_insert1", "mock_id1"]), + mock2=json.dumps(["string_to_insert2", "mock_id2"]), + mock3=json.dumps(["string_to_insert3", "mock_id3"]), + mock4=json.dumps(["string_to_insert4", "mock_id4"]), + ) + ) + yield pool + await pool.close() + + async def test_get_most_frequent_queries_success(self, pgpool): + ldb = LogsDB(pgpool) + mfq = await ldb.get_most_frequent_queries_null_time( + "QmPnu3R7Fm4RmBF21aCYUohDmWbKd3VMXo64ACiRtwUQrn", 2 + ) + assert mfq + query = "query {\n info(id: $_1) {\n stat\n val\n }\n}" + compare = LogsDB.MRQ_Info( + hash=b"hash1", + query=query, + timestamp=datetime(2023, 9, 19, 10, 59, 10, 463629), + ) + assert mfq[0].query == compare.query + assert mfq[0].hash == compare.hash + + async def test_get_measure_query_time(self, pgpool): + ldb = LogsDB(pgpool) + mfq = await ldb.get_most_frequent_queries_null_time( + "QmPnu3R7Fm4RmBF21aCYUohDmWbKd3VMXo64ACiRtwUQrn", 2 + ) + assert mfq + with mock.patch( + "autoagora.model_builder.query_graph_node", + side_effect=query_graph_node_mock_side, + ) as query_graph_node_mock: + query_graph_node_mock.return_value = None + await measure_query_time( + "QmPnu3R7Fm4RmBF21aCYUohDmWbKd3VMXo64ACiRtwUQrn", mfq[0], ldb, 10 + ) + async with pgpool.connection() as conn: + response = await conn.execute( + """ + SELECT subgraph, query_hash, timestamp, query_time_ms, query_variables + FROM mrq_query_logs WHERE subgraph = 'QmPnu3R7Fm4RmBF21aCYUohDmWbKd3VMXo64ACiRtwUQrn' + """ + ) + response = await response.fetchall() + print( + f"############################# \n {response} ###################" + ) + # Checking 10 entries where made to the db + assert len(response) == 10 + + async def test_build_mrq_model(self, pgpool, postgresql): + # This will test almost all of the process, frequent queries is skipped since it takes too much if there are too many values + with mock.patch( + "autoagora.logs_db.LogsDB.get_most_frequent_queries_null_time" + ) as get_mfq_null: + query = "query {\n info(id: $_1) {\n stat\n val\n }\n}" "]" + mfq = [ + LogsDB.MRQ_Info( + hash=b"hash1", + query=query, + timestamp=datetime(2023, 9, 19, 10, 59, 10, 463629), + ) + ] + get_mfq_null.return_value = mfq + with mock.patch( + "autoagora.model_builder.query_graph_node", + side_effect=query_graph_node_mock_side, + ) as query_graph_node_mock: + query_graph_node_mock.return_value = None + init_config( + [ + "--indexer-agent-mgmt-endpoint", + "http://nowhere", + "--postgres-host", + postgresql.info.host, + "--postgres-username", + postgresql.info.user, + "--postgres-password", + postgresql.info.password, + "--postgres-port", + str(postgresql.info.port), + "--postgres-database", + postgresql.info.dbname, + "--indexer-service-metrics-endpoint", + "http://indexer-service.default.svc.cluster.local:7300/metrics", + ] + ) + model = await mrq_model_builder( + "QmPnu3R7Fm4RmBF21aCYUohDmWbKd3VMXo64ACiRtwUQrn", pgpool + ) + pattern = r"# Generated by AutoAgora \d+\.\d+\.\d+" + # To ensure a version is being obtained + assert re.match( + pattern, model + ), f"{model} does not match pattern {pattern}" + async with pgpool.connection() as conn: + mrq_data = await conn.execute( + """ + SELECT + * + FROM + mrq_query_logs + WHERE + subgraph = 'QmPnu3R7Fm4RmBF21aCYUohDmWbKd3VMXo64ACiRtwUQrn' + """ + ) + mrq_data = await mrq_data.fetchall() + # Should be 100 since is the amount of iterations made + assert len(mrq_data) == 100 + + async def test_get_most_frequent_queries_failed(self, pgpool): + ldb = LogsDB(pgpool) + mfq = await ldb.get_most_frequent_queries( + "QmTJBvvpknMow6n4YU8R9Swna6N8mHK8N2WufetysBiyuL", "mrq_query_logs" + ) + # empty array will be returned since min is default to 100 + assert mfq == []