Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ENH: Avoid blocking thread on certain requests #52

Draft
wants to merge 15 commits into
base: main
Choose a base branch
from
9 changes: 7 additions & 2 deletions .github/workflows/build-test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ jobs:
echo "ARCHIVE=$ARCHIVE" >> $GITHUB_ENV
- name: Install package and test dependencies
run: python -m pip install $ARCHIVE[test]
- uses: actions/checkout@v3
- uses: actions/checkout@v4
with:
repository: 'nipreps/migas-server'
path: 'migas-server'
Expand All @@ -100,11 +100,16 @@ jobs:
env:
MIGAS_BYPASS_RATE_LIMIT: '1'
- name: Verify server is available
run: docker port ${MIGAS_SERVER_NAME} && sleep 10
run: docker port ${MIGAS_SERVER_NAME} && sleep 10 && docker ps
env:
MIGAS_SERVER_NAME: app
- name: Run tests
env:
MIGAS_FRESH_DB: '1'
run: python -m pytest -sv --doctest-modules --pyargs migas
- name: Show server logs (debug)
if: always()
run: docker container logs app
- name: Stop local server
run: docker compose -f migas-server/docker-compose.yml down

Expand Down
12 changes: 6 additions & 6 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import migas; migas.setup(endpoint='your-endpoint')

`migas` includes the following functions to communicate with the telemetry server:

### migas.add_breadcrumb()
### [migas.add_breadcrumb] (#add-breadcrumb)
---
Send a breadcrumb with usage information to the server.

Expand Down Expand Up @@ -57,12 +57,12 @@ Send a breadcrumb with usage information to the server.

```python
>>> add_breadcrumb('nipreps/migas-py', '0.0.1', status='R', status_desc='Finished long step')
{'success': True}
>>>
```

</details>

### migas.check_project()
### [migas.check_project] (#check-project)
---
Check a project version against later developments.

Expand All @@ -81,7 +81,7 @@ Check a project version against later developments.

</details>

### migas.get_usage()
### [migas.get_usage] (#get-usage)
---
Check number of uses a `project` has received from a start date, and optionally an end date.
If no end date is specified, the current datetime is used.
Expand All @@ -97,11 +97,11 @@ If no end date is specified, the current datetime is used.
</details>


### migas.track_exit()
### [migas.track_exit] (#track-exit)
---
Register an exit function to send a final ping upon termination of the Python interpretter.
Useful when monitoring a process that may preemptively error.
The inputs are equivalent to [`add_breadcrumb()`](#addbreadcrumb)
The inputs are equivalent to [`add_breadcrumb()`](#add-breadcrumb)

## User Control

Expand Down
80 changes: 55 additions & 25 deletions migas/operations.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,28 +76,34 @@ class AddBreadcrumb(Operation):


@telemetry_enabled
def add_breadcrumb(project: str, project_version: str, **kwargs) -> dict:
def add_breadcrumb(project: str, project_version: str, wait: bool = False, **kwargs) -> dict | None:
"""
Send a breadcrumb with usage information to the telemetry server.

- `project` - application name
- `project_version` - application version

Optional keyword arguments
- `language` (auto-detected)
- `language_version` (auto-detected)
- process-specific
- `status`
- `status_desc`
- `error_type`
- `error_desc`
- context-specific
- `user_id` (auto-generated)
- `session_id`
- `user_type`
- `platform` (auto-detected)
- `container` (auto-detected)
- `is_ci` (auto-detected)
Parameters
----------
project : str
Project name, formatted in GitHub `<owner>/<repo>` convention
project_version : str
Version string
wait : bool, default=False
If enable, wait for server response.
**kwargs
Additional usage information to send. Includes:
- `language` (auto-detected)
- `language_version` (auto-detected)
- process-specific
- `status`
- `status_desc`
- `error_type`
- `error_desc`
- context-specific
- `user_id` (auto-generated)
- `session_id`
- `user_type`
- `platform` (auto-detected)
- `container` (auto-detected)
- `is_ci` (auto-detected)

Returns
-------
Expand All @@ -108,9 +114,11 @@ def add_breadcrumb(project: str, project_version: str, **kwargs) -> dict:
project=project, project_version=project_version, **kwargs
)
logger.debug(query)
_, response = request(Config.endpoint, query=query)
res = _filter_response(response, AddBreadcrumb.operation_name, AddBreadcrumb.error_response)
return res
res = request(Config.endpoint, query=query, wait=wait)
if wait:
logger.debug(res)
res = _filter_response(res[1], AddBreadcrumb.operation_name, AddBreadcrumb.error_response)
return res


class AddProject(Operation):
Expand Down Expand Up @@ -169,7 +177,8 @@ def add_project(project: str, project_version: str, **kwargs) -> dict:
)
query = AddProject.generate_query(project=project, project_version=project_version, **kwargs)
logger.debug(query)
_, response = request(Config.endpoint, query=query)
_, response = request(Config.endpoint, query=query, wait=True)
logger.debug(response)
res = _filter_response(response, AddProject.operation_name, AddProject.error_response)
return res

Expand Down Expand Up @@ -211,7 +220,8 @@ def check_project(project: str, project_version: str, **kwargs) -> dict:
"""
query = CheckProject.generate_query(project=project, project_version=project_version, **kwargs)
logger.debug(query)
_, response = request(Config.endpoint, query=query)
_, response = request(Config.endpoint, query=query, wait=True)
logger.debug(response)
res = _filter_response(response, CheckProject.operation_name)
return res

Expand All @@ -229,9 +239,29 @@ class GetUsage(Operation):

@telemetry_enabled
def get_usage(project: str, start: str, **kwargs) -> dict:
"""Retrieve usage statistics from the migas server.

Parameters
----------
project : str
Project name, formatted in GitHub `<owner>/<repo>` convention
start : str
Start of data collection. Supports the following formats:
`YYYY-MM-DD`
`YYYY-MM-DDTHH:MM:SSZ'
kwargs
Additional arguments for the query
end: End range of data collection. Same formats as `start`.
unique: Filter out hits from same user_id.

Returns
response : dict
success, hits, unique, message
"""
query = GetUsage.generate_query(project=project, start=start, **kwargs)
logger.debug(query)
_, response = request(Config.endpoint, query=query)
_, response = request(Config.endpoint, query=query, wait=True)
logger.debug(response)
res = _filter_response(response, GetUsage.operation_name)
return res

Expand Down
35 changes: 32 additions & 3 deletions migas/request.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
"""Stripped down, minimal import way to communicate with server"""
from __future__ import annotations

import json
import os
import warnings
from typing import Optional, Tuple, Union
from http.client import HTTPConnection, HTTPResponse, HTTPSConnection
from urllib.parse import urlparse
from concurrent.futures import ThreadPoolExecutor

from . import __version__
from .config import logger

ETResponse = Tuple[int, Union[dict, str]] # status code, body

Expand All @@ -20,6 +22,29 @@


def request(
url: str,
*,
query: str = None,
timeout: float = None,
method: str = "POST",
chunk_size: int | None = None,
wait: bool = False,
) -> None:
"""
Send a non-blocking call to the server.

This will never check the future, and no assumptions can be made about server receptivity.
"""
with ThreadPoolExecutor() as executor:
future = executor.submit(
_request, url, query=query, timeout=timeout, method=method, chunk_size=chunk_size,
)

if wait is True:
return future.result()


def _request(
url: str,
*,
query: str = None,
Expand Down Expand Up @@ -73,11 +98,15 @@ def request(
finally:
conn.close()

if body and response.headers.get("content-type").startswith("application/json"):
if body and response.headers.get("content-type", "").startswith("application/json"):
body = json.loads(body)

if not response.headers.get("X-Backend-Server"):
logger.warning("migas server is incorrectly configured.")
warnings.warn(
"migas server is incorrectly configured.",
UserWarning,
stacklevel=1,
)
return response.status, body


Expand Down
70 changes: 27 additions & 43 deletions migas/tests/test_operations.py
Original file line number Diff line number Diff line change
@@ -1,58 +1,61 @@
from datetime import datetime as dt
from datetime import timedelta
from datetime import timezone as tz
import time

from looseversion import LooseVersion
import pytest

from migas import __version__
import migas
from migas.operations import (
add_breadcrumb,
add_project,
check_project,
get_usage,
)

from .utils import do_server_tests
from .utils import run_server_tests

# skip all tests in module if server is not available
pytestmark = pytest.mark.skipif(not do_server_tests, reason="Local server not found")
pytestmark = pytest.mark.skipif(not run_server_tests, reason="Local server not found")

test_project = 'nipreps/migas-py'
today = dt.now(tz.utc)
future = (today + timedelta(days=2)).strftime('%Y-%m-%d')
today = today.strftime('%Y-%m-%d')


def test_operations(setup_migas):
_test_add_breakcrumb()
# add delay to ensure server has updated
time.sleep(2)
_test_get_usage()
TEST_ROOT = "http://localhost:8080/"
TEST_ENDPOINT = f"{TEST_ROOT}graphql"

def _test_add_breakcrumb():
res = add_breadcrumb(test_project, __version__)
assert res['success'] is True


@pytest.fixture(autouse=True, scope='module')
def setup_migas():
"""Ensure migas is configured to communicate with the staging app."""
migas.setup(endpoint=TEST_ENDPOINT)

assert migas.config.Config._is_setup
return migas.config.Config._is_setup



def test_migas_add_get():
res = add_breadcrumb(test_project, migas.__version__)
# ensure kwargs can be submitted
res = add_breadcrumb(test_project, __version__, language='cpython', platform='win32')
res = add_breadcrumb(test_project, migas.__version__, wait=True, language='cpython', platform='win32')
assert res['success'] is True

# validation should happen instantly
res = add_breadcrumb(test_project, __version__, status='wtf')
# this breadcrumb is not valid, so won't be tracked
res = add_breadcrumb(test_project, migas.__version__, wait=True, status='wtf')
assert res['success'] is False

def _test_get_usage():
"""This test requires `_test_add_breadcrumb()` to be run before."""
# 2 crumbs should be present on the server, both from the same user
res = get_usage(test_project, start=today)
assert res['success'] is True
all_usage = res['hits']
assert all_usage > 0
assert all_usage == 2

res = get_usage(test_project, start=today, unique=True)
assert res['success'] is True
assert all_usage >= res['hits'] > 0
assert all_usage > res['hits'] > 0

res = get_usage(test_project, start=future)
assert res['success'] is True
Expand All @@ -64,30 +67,11 @@ def _test_get_usage():
assert res['hits'] == 0


def test_add_project(setup_migas):
res = add_project(test_project, __version__)
assert res['success'] is True
latest = res['latest_version']
assert latest

# ensure kwargs can be submitted
res = add_project(test_project, __version__, language='cpython', platform='win32')
assert res['success'] is True
assert res['latest_version'] == latest
# should be cached since we just checked the version
assert res['cached'] is True

# illegal queries should fail
res = add_project(test_project, __version__, status='wtf')
assert res['success'] is False
assert res['latest_version'] is None


def test_check_project(setup_migas):
res = check_project(test_project, __version__)
def test_check_project():
res = check_project(test_project, migas.__version__)
assert res['success'] is True
assert res['latest']
v = LooseVersion(__version__)
v = LooseVersion(migas.__version__)
latest = LooseVersion(res['latest'])
assert v >= latest
assert res['flagged'] is False
Expand Down
Loading
Loading