Skip to content

Commit

Permalink
Merge pull request #13 from AndrewSergienko/develop
Browse files Browse the repository at this point in the history
Change replication file functional
  • Loading branch information
andiserg authored Aug 9, 2023
2 parents c161774 + 84ca633 commit b8de058
Show file tree
Hide file tree
Showing 7 changed files with 25 additions and 28 deletions.
4 changes: 1 addition & 3 deletions src/abstract/adapters.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,7 @@ async def download_and_save_file(
pass

@abstractmethod
async def upload_file(
self, server: Server, file_info: FileInfo, chunk_iterator: AsyncIterable
):
async def upload_file(self, server: Server, files_dir: Path, file_info: FileInfo):
pass

@abstractmethod
Expand Down
18 changes: 13 additions & 5 deletions src/adapters.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import asyncio
import json
import os
from datetime import datetime
Expand Down Expand Up @@ -27,12 +28,19 @@ async def download_and_save_file(
return FileInfo(name=file_name, file_type=file_type, origin_url=link)

async def upload_file(
self, server: Server, file_info: FileInfo, chunk_iterator
self, server: Server, files_dir: Path, file_info: FileInfo
) -> dict:
headers = {"FILE-NAME": f"{file_info.name}.{file_info.file_type}"}
async with ClientSession(headers=headers) as session:
async with session.put(f"{server.url}/files/", data=chunk_iterator) as resp:
return {"server": server, "status": resp.status}
file_name = f"{file_info.name}.{file_info.file_type}"
server_ip = server.url.strip("http://")
process = await asyncio.create_subprocess_exec(
"scp",
"-o",
"StrictHostKeyChecking=no",
files_dir / file_name,
f"{server_ip}:files/",
)
await process.wait()
return {"server": server}

async def send_file_status(self, origin_url: str, status: dict):
async with ClientSession() as session:
Expand Down
3 changes: 1 addition & 2 deletions src/domain/events.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
from dataclasses import dataclass
from datetime import datetime
from typing import AsyncIterable

from src.domain.model import FileInfo, Server

Expand All @@ -18,7 +17,7 @@ class FileProcessedEvent(Event):

@dataclass
class FileSavedEvent(FileProcessedEvent):
chunk_iterator: AsyncIterable
pass


@dataclass
Expand Down
11 changes: 3 additions & 8 deletions src/services/commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,16 +43,11 @@ async def download_and_save_file(
file_info = await context.web.download_and_save_file(
link, files_dir, file_name, save_file_function
)
chunk_iter = await get_chunk_iterator_factory(
context, f"{file_name}.{file_info.file_type}"
)
event = events.FileSavedEvent(
file_info, timer.execution_time, datetime.now(), chunk_iter
)
event = events.FileSavedEvent(file_info, timer.execution_time, datetime.now())
await publish_event(context, event)


async def get_chunk_iterator_factory(context: AContext, file_name: str):
async def get_chunk_iterator(context: AContext, file_name: str):
"""
Get a chunk iterator for the specified file.
Expand All @@ -62,7 +57,7 @@ async def get_chunk_iterator_factory(context: AContext, file_name: str):
"""
file_path = context.FILES_DIR / file_name
chunk_size = int(await context.env.get("CHUNK_SIZE"))
return context.files.get_chunk_iterator_factory(file_path, chunk_size)
return context.files.get_chunk_iterator(file_path, chunk_size)


async def save_file(
Expand Down
9 changes: 5 additions & 4 deletions src/services/handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,12 @@ async def replicate_file(context: AContext, event: FileSavedEvent):
servers = await context.servers.get_servers(context.ROOT_DIR)
server_name = await context.env.get("NAME")
servers = list(filter(lambda server: server.name != server_name, servers))
# create uploading tasks
files_dir = context.FILES_DIR
tasks = [
context.web.upload_file(server, event.file_info, event.chunk_iterator())
context.web.upload_file(server, files_dir, event.file_info)
for server in servers
]

start_time = datetime.now()
for task in asyncio.as_completed(tasks):
# get result of finished task
Expand Down Expand Up @@ -74,6 +75,6 @@ def get_status_from_event(


EVENT_HANDLERS = {
events.FileSavedEvent: [replicate_file, send_saved_file_status],
events.FileReplicatedEvent: [send_replicated_file_status],
events.FileSavedEvent: [replicate_file],
events.FileReplicatedEvent: [],
}
5 changes: 1 addition & 4 deletions tests/test_adapters.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,10 +50,7 @@ async def test_web_client_upload_file_with_correct_data_should_upload(
server = Server(name="TestVPS", url=f"http://146.190.84.172", zone="test")
file_info = FileInfo(name="aboba", file_type="json", origin_url="test")

response = await context.web.upload_file(
server, file_info, test_chunk_iterator_upload
)
assert response["status"] == 200
response = await context.web.upload_file(server, context.FILES_DIR, file_info)
assert response["server"] == server


Expand Down
3 changes: 1 addition & 2 deletions tests/test_services.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,8 @@

@pytest.mark.asyncio
async def test_replication_file_with_file_should_publish_event(fake_context):
chunk_iterator = await fake_context.files.get_chunk_iterator_factory(Path(), 10)
file_info = FileInfo(name="text", file_type="txt", origin_url="test")
event = FileSavedEvent(file_info, 0, datetime.now(), chunk_iterator)
event = FileSavedEvent(file_info, 0, datetime.now())
await replicate_file(fake_context, event)

assert isinstance(fake_context.events.events[-1], FileReplicatedEvent)
Expand Down

0 comments on commit b8de058

Please sign in to comment.