Skip to content

Commit

Permalink
Add support for remote-path in uploads and downloads
Browse files Browse the repository at this point in the history
  • Loading branch information
kouylekov-usit committed Oct 25, 2024
1 parent 5f544b1 commit fb2fe5c
Show file tree
Hide file tree
Showing 3 changed files with 156 additions and 39 deletions.
125 changes: 96 additions & 29 deletions tsdapiclient/fileapi.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@

"""TSD File API client."""

import sys
import hashlib
import json
import os
Expand Down Expand Up @@ -115,19 +115,26 @@ def format_filename(filename: str) -> str:
return os.path.basename(filename)


def upload_resource_name(filename: str, is_dir: bool, group: Optional[str] = None) -> str:
def upload_resource_name(filename: str, is_dir: bool, group: Optional[str] = None, remote_path: Optional[str] =None) -> str:
if not is_dir:
debug_step('uploading file')
resource = quote(format_filename(filename))
if remote_path:
resource = f'{remote_path}{resource}'
else:
resource = f'/{resource}'
if group:
resource = f'{group}/{resource}'
resource = f'{group}{resource}'
elif is_dir:
debug_step('uploading directory (file)')
if filename.startswith('/'):
target = filename[1:]
else:
target = filename
resource = f'{group}/{quote(target)}'
if remote_path:
resource = f'{group}{remote_path}{quote(target)}'
else:
resource = f'{group}/{quote(target)}'
return resource


Expand Down Expand Up @@ -221,6 +228,7 @@ def streamfile(
api_key: Optional[str] = None,
refresh_token: Optional[str] = None,
refresh_target: Optional[int] = None,
remote_path: Optional[str] = None
) -> dict:
"""
Idempotent, lazy data upload from files.
Expand All @@ -247,7 +255,7 @@ def streamfile(
"""
tokens = maybe_refresh(env, pnum, api_key, token, refresh_token, refresh_target)
token = tokens.get('access_token') if tokens else token
resource = upload_resource_name(filename, is_dir, group=group)
resource = upload_resource_name(filename, is_dir, group=group, remote_path=remote_path)
endpoint=f"stream/{resource}?group={group}"
url = f'{file_api_url(env, pnum, backend, endpoint=endpoint)}'
headers = {'Authorization': f'Bearer {token}'}
Expand Down Expand Up @@ -308,9 +316,10 @@ def import_list(
page: Optional[str] = None,
group: Optional[str] = None,
per_page: Optional[int] = None,
remote_path: Optional[str] = None,
) -> dict:
"""
Get the list of files in the import direcctory, for a given group.
Get the list of files in the import directory, for a given group.
Parameters
----------
Expand All @@ -325,8 +334,11 @@ def import_list(
per_page: number of files to list per page
"""
resource = f'/{directory}' if directory else ''
endpoint=f"stream/{group}{resource}"
resource = directory if directory else ''
if remote_path:
endpoint=f"stream/{group}{remote_path}{resource}"
else:
endpoint=f"stream/{group}{resource}"
url = f'{file_api_url(env, pnum, backend, endpoint=endpoint , page=page, per_page=per_page)}'
headers = {'Authorization': 'Bearer {0}'.format(token)}
debug_step(f'listing resources at {url}')
Expand Down Expand Up @@ -388,10 +400,14 @@ def import_delete(
api_key: Optional[str] = None,
refresh_token: Optional[str] = None,
refresh_target: Optional[int] = None,
remote_path: Optional[str] = None,
) -> requests.Response:
tokens = maybe_refresh(env, pnum, api_key, token, refresh_token, refresh_target)
token = tokens.get("access_token") if tokens else token
endpoint = f'stream/{group}/{filename}'
if remote_path:
endpoint = f'stream/{group}{remote_path}{filename}'
else:
endpoint = f'stream/{group}{filename}'
url = f'{file_api_url(env, pnum, "files", endpoint=endpoint)}'
headers = {'Authorization': f'Bearer {token}'}
print(f'deleting: {filename}')
Expand All @@ -410,10 +426,14 @@ def export_delete(
api_key: Optional[str] = None,
refresh_token: Optional[str] = None,
refresh_target: Optional[int] = None,
remote_path: Optional[str] = None,
) -> requests.Response:
tokens = maybe_refresh(env, pnum, api_key, token, refresh_token, refresh_target)
token = tokens.get("access_token") if tokens else token
endpoint = f'export/{filename}'
if remote_path:
endpoint = f'export{remote_path}{filename}'
else:
endpoint = f'export/{filename}'
url = f'{file_api_url(env, pnum, "files", endpoint=endpoint)}'
headers = {'Authorization': f'Bearer {token}'}
print(f'deleting: {filename}')
Expand All @@ -433,6 +453,7 @@ def export_list(
page: Optional[str] = None,
group: Optional[str] = None,
per_page: Optional[int] = None,
remote_path: Optional[str] = None,
) -> dict:
"""
Get the list of files available for export.
Expand All @@ -450,8 +471,28 @@ def export_list(
per_page: number of files to list per page
"""
resource = f'/{directory}' if directory else ''
endpoint = f'export{resource}'
resource = directory if directory else ''
if remote_path:

if not resource :
# checks if remote path is a file or a directory
split_path = remote_path.split('/')
end_name = split_path[-2]
parent_dir = '/'.join(split_path[:-2])
if not parent_dir:
parent_dir = None
parent_list = export_list(env, pnum, token, backend, session, directory=parent_dir)
exists = False
for file in parent_list['files']:
if file['filename'] == end_name:
exists = True
if file["mime-type"] != "directory":
sys.exit(f'{remote_path} is a file, not a directory')
if not exists:
sys.exit(f'{remote_path} does not exist')
endpoint = f"export{remote_path}{resource}"
else:
endpoint = f'export/{resource}'
url = f'{file_api_url(env, pnum, backend, endpoint=endpoint, page=page, per_page=per_page)}'
headers = {'Authorization': 'Bearer {0}'.format(token)}
debug_step(f'listing resources at {url}')
Expand All @@ -470,9 +511,13 @@ def export_head(
token: str,
backend: str = 'files',
session: Any = requests,
remote_path: Optional[str] = None,
) -> requests.Response:
headers = {'Authorization': 'Bearer {0}'.format(token), "Accept-Encoding": "*"}
endpoint = f'export/{filename}'
if remote_path:
endpoint = f"export{remote_path}{filename}"
else:
endpoint = f'export/{filename}'
url = f'{file_api_url(env, pnum, backend, endpoint=endpoint)}'
resp = session.head(url, headers=headers)
return resp
Expand All @@ -497,6 +542,7 @@ def export_get(
refresh_token: Optional[str] = None,
refresh_target: Optional[int] = None,
public_key: Optional["libnacl.public.PublicKey"] = None,
remote_path: Optional[str] = None,
) -> dict:
"""
Download a file to the current directory.
Expand All @@ -512,7 +558,7 @@ def export_get(
dev_url: development url
backend: API backend
session: requests.session
no_print_id: supress printing the download id
no_print_id: suppress printing the download id
set_mtime: set local file mtime to be the same as remote resource
nobar: disable the progress bar
target_dir: where to save the file locally
Expand Down Expand Up @@ -540,15 +586,21 @@ def export_get(
if dev_url:
url = dev_url
else:
urlpath = '' if backend == 'survey' else 'export/'
if backend == 'files':
urlpath = ''
else:
if remote_path:
urlpath = f"export{remote_path}"
else:
urlpath = 'export/'
endpoint = f'{urlpath}{filename}'
# make provision for unsatisfactory semantics
if backend in ['export', 'files']:
service = 'files'
elif backend == 'survey':
service = backend
url = f'{file_api_url(env, pnum, service, endpoint=endpoint)}'
debug_step(f'fecthing file info using: {url}')
debug_step(f'fetching file info using: {url}')
resp = session.head(url, headers=headers)
resp.raise_for_status()
try:
Expand Down Expand Up @@ -614,10 +666,14 @@ def _resumable_url(
backend: str = 'files',
is_dir: bool = False,
group: Optional[str] = None,
remote_path: Optional[str] = None,
) -> str:
resource = upload_resource_name(filename, is_dir, group=group)
resource = upload_resource_name(filename, is_dir, group=group, remote_path=remote_path)
if not dev_url:
endpoint = f'stream/{resource}'
if remote_path:
endpoint = f"stream{remote_path}{resource}"
else:
endpoint = f"stream/{resource}"
url = f'{file_api_url(env, pnum, backend, endpoint=endpoint)}'
else:
url = dev_url
Expand Down Expand Up @@ -678,6 +734,7 @@ def get_resumable(
api_key: Optional[str] = None,
refresh_token: Optional[str] = None,
refresh_target: Optional[int] = None,
remote_path: Optional[str] = None,
) -> dict:
"""
List uploads which can be resumed.
Expand All @@ -688,8 +745,13 @@ def get_resumable(
"""
if not dev_url:
filename = f'/{quote(format_filename(filename))}' if filename else ''
endpoint = f'resumables{filename}'
if filename:
filename = f'{quote(format_filename(filename))}'
if remote_path:
filename = f'{remote_path}{filename}'
endpoint = f'resumables{filename}'
else:
endpoint = 'resumables'
url = f'{file_api_url(env, pnum, backend, endpoint=endpoint)}'
else:
url = dev_url
Expand Down Expand Up @@ -726,6 +788,7 @@ def initiate_resumable(
api_key: Optional[str] = None,
refresh_token: Optional[str] = None,
refresh_target: Optional[int] = None,
remote_path: Optional[str] = None,
) -> dict:
"""
Performs a resumable upload, either by resuming a partial one,
Expand All @@ -734,10 +797,10 @@ def initiate_resumable(
Parameters
----------
env: 'test' or 'prod'
pnum: project numnber
pnum: project number
filename: filename
token: JWT
chunksize: user specified chunkszie in bytes
chunksize: user specified chunksize in bytes
new: flag to enable resume
group: group owner after upload
verify: verify md5 chunk integrity before resume
Expand Down Expand Up @@ -774,6 +837,7 @@ def initiate_resumable(
api_key=api_key,
refresh_token=refresh_token,
refresh_target=refresh_target,
remote_path=remote_path,
)
if data.get('tokens'):
tokens = data.get('tokens')
Expand Down Expand Up @@ -805,6 +869,7 @@ def initiate_resumable(
api_key=api_key,
refresh_token=refresh_token,
refresh_target=refresh_target,
remote_path=remote_path,
)
except Exception as e:
print(e)
Expand All @@ -827,14 +892,14 @@ def initiate_resumable(
api_key=api_key,
refresh_token=refresh_token,
refresh_target=refresh_target,
remote_path=remote_path,
)


@handle_request_errors
def _complete_resumable(
env: str,
pnum: str,
filename: str,
token: str,
url: str,
bar: Bar,
Expand Down Expand Up @@ -875,13 +940,14 @@ def _start_resumable(
api_key: Optional[str] = None,
refresh_token: Optional[str] = None,
refresh_target: Optional[int] = None,
remote_path: Optional[str] = None,
) -> dict:
"""
Start a new resumable upload, reding a file, chunk-by-chunk
and performaing a PATCH request per chunk.
and performing a PATCH request per chunk.
"""
url = _resumable_url(env, pnum, filename, dev_url, backend, is_dir, group=group)
url = _resumable_url(env, pnum, filename, dev_url, backend, is_dir, group=group, remote_path=remote_path)
headers = {'Authorization': f'Bearer {token}'}
current_mtime = os.stat(filename).st_mtime if set_mtime else None
if set_mtime:
Expand Down Expand Up @@ -926,7 +992,6 @@ def _start_resumable(
resp = _complete_resumable(
env,
pnum,
filename,
token,
parmaterised_url,
bar,
Expand All @@ -935,6 +1000,7 @@ def _start_resumable(
api_key=api_key,
refresh_token=refresh_token,
refresh_target=refresh_target,
remote_path=remote_path,
)
if not tokens:
tokens = resp.get('tokens')
Expand All @@ -959,16 +1025,17 @@ def _continue_resumable(
api_key: Optional[str] = None,
refresh_token: Optional[str] = None,
refresh_target: Optional[int] = None,
remote_path: Optional[str] = None,
) -> dict:
"""
Continue a resumable upload, reding a file, from the
appopriate byte offset, chunk-by-chunk and performaing
appropriate byte offset, chunk-by-chunk and performing
a PATCH request per chunk. Optional chunk md5 verification
before resume.
"""
tokens = {}
url = _resumable_url(env, pnum, filename, dev_url, backend, is_dir, group=group)
url = _resumable_url(env, pnum, filename, dev_url, backend, is_dir, group=group, remote_path=remote_path)
headers = {'Authorization': f'Bearer {token}'}
current_mtime = os.stat(filename).st_mtime if set_mtime else None
if set_mtime:
Expand Down Expand Up @@ -1013,7 +1080,6 @@ def _continue_resumable(
resp = _complete_resumable(
env,
pnum,
filename,
token,
parmaterised_url,
bar,
Expand All @@ -1022,6 +1088,7 @@ def _continue_resumable(
api_key=api_key,
refresh_token=refresh_token,
refresh_target=refresh_target,
remote_path=remote_path,
)
if not tokens:
tokens = resp.get('tokens')
Expand Down
Loading

0 comments on commit fb2fe5c

Please sign in to comment.