-
Notifications
You must be signed in to change notification settings - Fork 1
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
* Added BasicUpstream * Added configuration * Added integgration tests * Addrtessed comments
- Loading branch information
Showing
8 changed files
with
329 additions
and
67 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,25 @@ | ||
from typing import Dict | ||
|
||
from aiohttp import BasicAuth | ||
from aiohttp.hdrs import AUTHORIZATION | ||
|
||
from .upstream import Upstream | ||
|
||
|
||
class BasicUpstream(Upstream): | ||
def __init__(self, *, username: str, password: str) -> None: | ||
self._username = username | ||
self._password = password | ||
|
||
async def _get_headers(self) -> Dict[str, str]: | ||
auth = BasicAuth(login=self._username, password=self._password) | ||
return {str(AUTHORIZATION): auth.encode()} | ||
|
||
async def get_headers_for_version(self) -> Dict[str, str]: | ||
return await self._get_headers() | ||
|
||
async def get_headers_for_catalog(self) -> Dict[str, str]: | ||
return await self._get_headers() | ||
|
||
async def get_headers_for_repo(self, repo: str) -> Dict[str, str]: | ||
return await self._get_headers() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,107 @@ | ||
from contextlib import asynccontextmanager | ||
from typing import AsyncIterator | ||
|
||
import aiohttp.web | ||
import pytest | ||
from aiohttp import BasicAuth, hdrs, web | ||
from aiohttp.test_utils import unused_port | ||
from aiohttp.web import Application, HTTPOk, Request, Response, json_response | ||
from yarl import URL | ||
|
||
from platform_registry_api.api import create_app | ||
from platform_registry_api.config import ( | ||
AuthConfig, | ||
Config, | ||
EnvironConfigFactory, | ||
ServerConfig, | ||
UpstreamRegistryConfig, | ||
UpstreamType, | ||
ZipkinConfig, | ||
) | ||
|
||
|
||
pytestmark = pytest.mark.asyncio | ||
|
||
|
||
@pytest.fixture | ||
async def raw_client() -> AsyncIterator[aiohttp.ClientSession]: | ||
async with aiohttp.ClientSession() as session: | ||
yield session | ||
|
||
|
||
@asynccontextmanager | ||
async def create_local_app_server( | ||
app: aiohttp.web.Application, host: str = "0.0.0.0", port: int = 8080 | ||
) -> AsyncIterator[URL]: | ||
runner = aiohttp.web.AppRunner(app) | ||
try: | ||
await runner.setup() | ||
site = aiohttp.web.TCPSite(runner, host, port) | ||
await site.start() | ||
yield URL(site.name) | ||
finally: | ||
await runner.shutdown() | ||
await runner.cleanup() | ||
|
||
|
||
class _TestUpstreamHandler: | ||
async def handle_catalog(self, request: Request) -> Response: | ||
auth_header_value = request.headers[hdrs.AUTHORIZATION] | ||
assert BasicAuth.decode(auth_header_value) == BasicAuth( | ||
login="testuser", password="testpassword" | ||
) | ||
return json_response({"repositories": []}) | ||
|
||
|
||
@pytest.fixture | ||
def handler() -> _TestUpstreamHandler: | ||
return _TestUpstreamHandler() | ||
|
||
|
||
@pytest.fixture | ||
async def upstream(handler: _TestUpstreamHandler) -> AsyncIterator[URL]: | ||
app = Application() | ||
app.add_routes([web.get("/v2/_catalog", handler.handle_catalog)]) | ||
|
||
async with create_local_app_server(app, port=unused_port()) as url: | ||
yield url | ||
|
||
|
||
@pytest.fixture | ||
def auth_config(in_docker: bool, admin_token: str) -> AuthConfig: | ||
if in_docker: | ||
return EnvironConfigFactory().create().auth | ||
return AuthConfig( | ||
server_endpoint_url=URL("http://localhost:5003"), service_token=admin_token | ||
) | ||
|
||
|
||
@pytest.fixture | ||
def config(upstream: URL, auth_config: AuthConfig) -> Config: | ||
upstream_registry = UpstreamRegistryConfig( | ||
type=UpstreamType.BASIC, | ||
endpoint_url=upstream, | ||
project="testproject", | ||
basic_username="testuser", | ||
basic_password="testpassword", | ||
) | ||
zipkin_config = ZipkinConfig(URL("http://zipkin:9411"), 0) | ||
return Config( | ||
server=ServerConfig(), | ||
upstream_registry=upstream_registry, | ||
auth=auth_config, | ||
zipkin=zipkin_config, | ||
cluster_name="test-cluster", | ||
) | ||
|
||
|
||
class TestBasicUpstream: | ||
async def test_catalog(self, config, regular_user_factory, aiohttp_client) -> None: | ||
app = await create_app(config) | ||
client = await aiohttp_client(app) | ||
user = await regular_user_factory() | ||
|
||
async with client.get("/v2/_catalog", auth=user.to_basic_auth()) as resp: | ||
assert resp.status == HTTPOk.status_code, await resp.text() | ||
payload = await resp.json() | ||
assert payload == {"repositories": []} |
Oops, something went wrong.