Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Added multi root queries to the model #84

Open
wants to merge 11 commits into
base: master
Choose a base branch
from
16 changes: 16 additions & 0 deletions autoagora/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,22 @@ def init_config(argv: Optional[Sequence[str]] = None):
default=3600,
help="(Seconds) Interval between rebuilds of the relative query costs models.",
)
argparser_relative_query_costs.add_argument(
"--graph-node-query-endpoint",
env_var="GRAPH_NODE_QUERY_ENDPOINT",
required=False,
help="graph-node query endpoint. Used to re-execute queries.",
)
argparser_relative_query_costs.add_argument(
"--multi-root-queries",
env_var="MULTI_ROOT_QUERIES",
required=False,
type=bool,
default=False,
help="Enables the multi-root query evaluator: runs randomly selected query "
"roots from logged multi-root queries, to build a statistical model of their "
"execution time.",
)
#
# Manual agora entry values
#
Expand Down
24 changes: 24 additions & 0 deletions autoagora/graph_node_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
# Copyright 2022-, Semiotic AI, Inc.
# SPDX-License-Identifier: Apache-2.0

import logging
from typing import Mapping, Optional

import aiohttp
import backoff
from gql import Client, gql
from gql.transport.aiohttp import AIOHTTPTransport

from autoagora.config import args


@backoff.on_exception(
backoff.expo, aiohttp.ClientError, max_time=30, logger=logging.root
)
async def query_graph_node(query: str, variables: Optional[Mapping] = None):
async with Client(
transport=AIOHTTPTransport(args.graph_node_query_endpoint),
fetch_schema_from_transport=False,
) as session:
result = await session.execute(gql(query), variable_values=variables) # type: ignore
return result
203 changes: 169 additions & 34 deletions autoagora/logs_db.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
# Copyright 2022-, Semiotic AI, Inc.
# SPDX-License-Identifier: Apache-2.0
import json
from dataclasses import dataclass
from datetime import datetime

import graphql
import psycopg_pool
Expand All @@ -17,8 +19,16 @@ class QueryStats:
avg_time: float
stddev_time: float

@dataclass
class MRQ_Info:
hash: bytes
query: str
timestamp: datetime
query_time_ms: int = 0

def __init__(self, pgpool: psycopg_pool.AsyncConnectionPool) -> None:
self.pgpool = pgpool
self.timestamp = datetime.now()

def return_query_body(self, query):
# Keep only query body -- ie. no var defs
Expand All @@ -28,48 +38,173 @@ def return_query_body(self, query):
query = "query " + graphql.print_ast(query)
return query

async def get_most_frequent_queries(
async def create_mrq_log_table(self) -> None:
async with self.pgpool.connection() as connection:
await connection.execute(
"""
CREATE TABLE IF NOT EXISTS mrq_query_logs (
id uuid PRIMARY KEY DEFAULT gen_random_uuid(),
subgraph char(46) NOT NULL,
query_hash bytea REFERENCES query_skeletons(hash),
timestamp timestamptz NOT NULL,
query_time_ms integer,
query_variables text
)
"""
)

async def get_most_frequent_queries_null_time(
self, subgraph_ipfs_hash: str, min_count: int = 100
):

async with self.pgpool.connection() as connection:
rows = await connection.execute(
sql.SQL(
"""
SELECT
query,
count_id,
min_time,
max_time,
avg_time,
stddev_time
FROM
query_skeletons
INNER JOIN
(
SELECT
query_hash as qhash,
count(id) as count_id,
Min(query_time_ms) as min_time,
Max(query_time_ms) as max_time,
Avg(query_time_ms) as avg_time,
Stddev(query_time_ms) as stddev_time
FROM
query_logs
WHERE
subgraph = {hash}
AND query_time_ms IS NOT NULL
GROUP BY
qhash
HAVING
Count(id) >= {min_count}
) as query_logs
ON
qhash = hash
ORDER BY
count_id DESC
SELECT
hash,
query

FROM
query_skeletons
INNER JOIN
(
SELECT
query_hash as qhash,
count(id) as count_id
FROM
query_logs
WHERE
subgraph = {subgraph}
AND query_time_ms IS NULL
GROUP BY
qhash
HAVING
Count(id) >= {min_count}
) as query_logs
ON
qhash = hash
ORDER BY
count_id DESC
"""
).format(subgraph=subgraph_ipfs_hash, min_count=min_count)
)
rows = await rows.fetchall()
return [
LogsDB.MRQ_Info(
hash=row[0],
query=self.return_query_body(row[1])
if self.return_query_body(row[1])
else "null",
timestamp=self.timestamp,
)
for row in rows
]

async def get_query_logs_id(self, hash):
async with self.pgpool.connection() as connection:
query_logs_ids = await connection.execute(
sql.SQL(
"""
SELECT
id
FROM
query_logs
WHERE
query_hash = {query_hash};
"""
).format(query_hash=hash)
)
query_logs_ids = await query_logs_ids.fetchall()
return query_logs_ids

async def get_query_variables(self, id: bytes):
async with self.pgpool.connection() as connection:
query_variables = await connection.execute(
sql.SQL(
"""
SELECT
query_variables
FROM
query_logs
WHERE
id = {id};
"""
).format(id=id)
)
query_variables = await query_variables.fetchall()
return json.loads(query_variables[0][0])

async def save_generated_aa_query_values(
self, query: MRQ_Info, subgraph: str, query_variables: list
):
async with self.pgpool.connection() as connection:
await connection.execute(
sql.SQL(
"""
INSERT INTO mrq_query_logs (
subgraph,
query_hash,
timestamp,
query_time_ms,
query_variables
)
VALUES ({subgraph}, {query_hash}, {timestamp}, {query_time}, {query_vars})
"""
).format(
subgraph=subgraph,
query_hash=query.hash,
timestamp=query.timestamp,
query_time=query.query_time_ms,
query_vars=json.dumps(query_variables),
)
)

async def get_most_frequent_queries(
self, subgraph_ipfs_hash: str, table: str, min_count: int = 100
):
async with self.pgpool.connection() as connection:
rows = await connection.execute(
sql.SQL(
"""
SELECT
query,
count_id,
min_time,
max_time,
avg_time,
stddev_time
FROM
query_skeletons
INNER JOIN
(
SELECT
query_hash as qhash,
count(id) as count_id,
Min(query_time_ms) as min_time,
Max(query_time_ms) as max_time,
Avg(query_time_ms) as avg_time,
Stddev(query_time_ms) as stddev_time
FROM
{table}
WHERE
subgraph = {hash}
AND query_time_ms IS NOT NULL
GROUP BY
qhash
HAVING
Count(id) >= {min_count}
) as query_logs
ON
qhash = hash
ORDER BY
count_id DESC
"""
).format(hash=subgraph_ipfs_hash, min_count=str(min_count)),
).format(
table=sql.Identifier(table),
hash=subgraph_ipfs_hash,
min_count=str(min_count),
)
)
rows = await rows.fetchall()
return [
Expand Down
22 changes: 21 additions & 1 deletion autoagora/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,12 @@

from autoagora.config import args, init_config
from autoagora.indexer_utils import get_allocated_subgraphs, set_cost_model
from autoagora.model_builder import apply_default_model, model_update_loop
from autoagora.logs_db import LogsDB
from autoagora.model_builder import (
apply_default_model,
model_update_loop,
mrq_model_update_loop,
)
from autoagora.price_multiplier import price_bandit_loop
from autoagora.query_metrics import (
K8SServiceWatcherMetricsEndpoints,
Expand All @@ -24,6 +29,7 @@
class SubgraphUpdateLoops:
bandit: Optional[aio.Future] = None
model: Optional[aio.Future] = None
mrq_model: Optional[aio.Future] = None

def __del__(self):
for future in [self.bandit, self.model]:
Expand Down Expand Up @@ -59,6 +65,10 @@ async def allocated_subgraph_watcher():
)
raise

# Initialize the extra table
logsDB = LogsDB(pgpool)
await logsDB.create_mrq_log_table()

# Initialize indexer-service metrics endpoints
if args.indexer_service_metrics_endpoint: # static list
metrics_endpoints = StaticMetricsEndpoints(
Expand Down Expand Up @@ -99,6 +109,16 @@ async def allocated_subgraph_watcher():
logging.info(
"Added model update loop for subgraph %s", new_subgraph
)
if args.multi_root_queries:
# Add the multi root queries (mrq)
update_loops[new_subgraph].mrq_model = aio.ensure_future(
mrq_model_update_loop(
new_subgraph, pgpool
) # Last parameter as true to enable mrq
)
logging.info(
"Added model update loop for subgraph %s", new_subgraph
)

# Launch the price multiplier update loop for the new subgraph
update_loops[new_subgraph].bandit = aio.ensure_future(
Expand Down
Loading
Loading