Skip to content

Commit

Permalink
Merge branch 'release-0.3.0'
Browse files Browse the repository at this point in the history
  • Loading branch information
LourensVeen committed Nov 2, 2018
2 parents 49d3827 + 6b549de commit 0e844fc
Show file tree
Hide file tree
Showing 21 changed files with 648 additions and 111 deletions.
17 changes: 17 additions & 0 deletions CHANGELOG.rst
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,23 @@ Change Log
All notable changes to this project will be documented in this file.
This project adheres to `Semantic Versioning <http://semver.org/>`_.

0.3.0
*****

Added
-----

* New copy_permissions option to copy()
* New callback option to copy()
* New Path.walk() method

Fixed
-----

* Add missing EntryType and Permission classes to API
* SFTP-to-SFTP copy deadlock


0.2.0
*****

Expand Down
1 change: 1 addition & 0 deletions MANIFEST.in
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
include LICENSE
include README.rst
include cerulean/py.typed
15 changes: 8 additions & 7 deletions cerulean/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
__email__ = 'l.veen@esciencecenter.nl'


from cerulean.copy_files import copy
from cerulean.copy_files import copy, CopyCallback
from cerulean.credential import Credential, PasswordCredential, PubKeyCredential
from cerulean.direct_gnu_scheduler import DirectGnuScheduler
from cerulean.factory import make_file_system, make_terminal, make_scheduler
Expand All @@ -21,7 +21,7 @@
from cerulean.job_status import JobStatus
from cerulean.local_file_system import LocalFileSystem
from cerulean.local_terminal import LocalTerminal
from cerulean.path import Path
from cerulean.path import EntryType, Permission, Path
from cerulean.scheduler import Scheduler
from cerulean.sftp_file_system import SftpFileSystem
from cerulean.slurm_scheduler import SlurmScheduler
Expand All @@ -48,8 +48,9 @@
"""

__all__ = ['copy', 'logger', 'make_file_system', 'make_terminal',
'make_scheduler', 'Credential', 'PasswordCredential',
'PubKeyCredential', 'DirectGnuScheduler', 'FileSystem',
'JobDescription', 'JobStatus', 'LocalFileSystem', 'LocalTerminal',
'Path', 'Scheduler', 'SftpFileSystem', 'SlurmScheduler',
'SshTerminal', 'Terminal', 'TorqueScheduler']
'make_scheduler', 'CopyCallback', 'Credential', 'EntryType',
'PasswordCredential', 'Permission', 'PubKeyCredential',
'DirectGnuScheduler', 'FileSystem', 'JobDescription', 'JobStatus',
'LocalFileSystem', 'LocalTerminal', 'Path', 'Scheduler',
'SftpFileSystem', 'SlurmScheduler', 'SshTerminal', 'Terminal',
'TorqueScheduler']
261 changes: 208 additions & 53 deletions cerulean/copy_files.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,25 @@
import logging
from typing import Optional
from time import perf_counter
from typing import Callable, Generator, Iterable, Optional

from cerulean.path import Path
from cerulean.path import Path, Permission


CopyCallback = Callable[[int, int], None]
"""The type of a callback function for the copy() function.
A callback function takes two arguments, the number of bytes copied \
thus far, and the approximate total number of bytes to copy. To \
interrupt the copy operation, raise an exception.
"""


def copy(source_path: Path,
target_path: Path,
overwrite: str = 'never',
copy_into: bool = True) -> None:
copy_into: bool = True,
copy_permissions: bool = False,
callback: Optional[CopyCallback] = None) -> None:
"""Copy a file or directory from one path to another.
Note that source_path and target_path may be paths on different \
Expand All @@ -28,11 +40,31 @@ def copy(source_path: Path,
be copied on top of the directory, subject to the setting for \
overwrite.
If copy_permissions is True, this function will make the target's \
permissions match those of the source, `including` SETUID, SETGID \
and sticky bits. If copy_permissions is False, the target's \
permissions are left at their default values (according to the \
umask, on Unix-like systems), less any permissions that the source \
file does not have.
If callback is provided, it should be a function taking two \
arguments, the current count of bytes copied and the total number \
of bytes to be copied. It will be called once at the beginning of \
the copy operation (with count == 0), once at the end (with count \
== total), and in between about once per second, if the copy takes \
long enough. Note that the total number of bytes passed to the \
callback is approximate, and that the count may be larger than \
the total if the estimate was off. To abort the copy, raise an \
exception from the callback function.
Args:
source_path: The path to the source file.
target_path: A path to copy it to.
overwrite: Selects behaviour when the target exists.
copy_into: Whether to copy into target directories.
copy_permissions: Whether to copy permissions along.
callback: A callback function to call regularly with progress \
reports.
"""
if overwrite not in ['always', 'never', 'raise']:
raise ValueError('Invalid value for overwrite. Valid values are' +
Expand All @@ -41,11 +73,22 @@ def copy(source_path: Path,
if target_path.is_dir() and copy_into:
target_path = target_path / source_path.name

_copy(source_path, target_path, overwrite, source_path)
size = _get_approx_size(source_path)

if callback is not None:
callback(0, size)

total_written = _copy(source_path, target_path, overwrite,
copy_permissions, source_path, callback, 0, size)

if callback is not None:
callback(total_written, size)


def _copy(source_path: Path, target_path: Path, overwrite: str,
context: Optional[Path]) -> None:
copy_permissions: bool, context: Optional[Path],
callback: Optional[CopyCallback], already_written: int, size: int
) -> int:
"""Copy a file or directory from one path to another.
See the documentation of copy() for the required behaviour.
Expand All @@ -57,63 +100,175 @@ def _copy(source_path: Path, target_path: Path, overwrite: str,
source_path: The path to the source file.
target_path: A path to copy it to.
overwrite: Selects behaviour when the target exists.
copy_permissions: Whether to copy permissions along.
context: Root of the tree we are copying, or None.
callback: A callback function to call.
already_written: Starting count of bytes written.
size: Approximate total size of data to copy.
Returns:
The approximate total number of bytes written.
"""
logging.debug('Copying {} to {}'.format(source_path, target_path))
target_path_exists = target_path.exists() or target_path.is_symlink()
if source_path.is_symlink():
if not target_path_exists or overwrite == 'always':
if context is not None:
linked_path = source_path.readlink(recursive=False)
if context in linked_path.parents:
rel_path = linked_path.relative_to(context)
logging.debug('Making relative link from {} to {}'.format(
target_path, rel_path))
target_fs = target_path.filesystem
if target_path.exists() or target_path.is_symlink():
if target_path.is_dir():
target_path.rmdir(recursive=True)
else:
target_path.unlink()
target_path.symlink_to(target_fs / str(rel_path))
return
elif overwrite == 'raise':
raise FileExistsError('Target path exists, not overwriting')
else:
return

if _copy_symlink(source_path, target_path, overwrite, context):
return already_written
if source_path.is_file():
if not target_path_exists or overwrite == 'always':
# TODO: permissions
# touch with correct mode
try:
logging.debug('Copying file from {} to {}'.format(
source_path, target_path))
target_path.streaming_write(source_path.streaming_read())
except PermissionError:
already_written = _copy_file(source_path, target_path, overwrite,
copy_permissions, callback,
already_written, size)
elif source_path.is_dir():
already_written = _copy_dir(source_path, target_path, overwrite,
copy_permissions, context, callback,
already_written, size)
else:
# We don't copy special entries or broken links
logging.debug(
'Skipping special entry or broken link {}'.format(source_path))
return already_written


def _copy_symlink(source_path: Path, target_path: Path, overwrite: str,
context: Optional[Path]) -> bool:
"""Copy a symlink.
If overwrite is True, overwrites the target. Copies links within \
the context as links and returns True, otherwise returns False.
"""
target_path_exists = target_path.exists() or target_path.is_symlink()
if not target_path_exists or overwrite == 'always':
if context is not None:
linked_path = source_path.readlink(recursive=False)
if context in linked_path.parents:
rel_path = linked_path.relative_to(context)
logging.debug('Making relative link from {} to {}'.format(
target_path, rel_path))
target_fs = target_path.filesystem
if target_path.exists() or target_path.is_symlink():
if target_path.is_dir():
target_path.rmdir(recursive=True)
else:
target_path.unlink()
target_path.symlink_to(target_fs / str(rel_path))
return True
return False # fall through and copy as file or directory
elif overwrite == 'raise':
raise FileExistsError('Target path exists, not overwriting')
return True # target path exists, fail silently


def _copy_file(source_path: Path, target_path: Path, overwrite: str,
copy_permissions: bool, callback: Optional[CopyCallback],
already_written: int, size: int) -> int:
"""Copy a file.
Returns the number of bytes written.
"""
target_path_exists = target_path.exists() or target_path.is_symlink()

if not target_path_exists or overwrite == 'always':
logging.debug('Copying file from {} to {}'.format(
source_path, target_path))

if not target_path.is_symlink() and target_path.is_dir():
target_path.rmdir(recursive=True)
elif target_path.exists():
target_path.unlink()
target_path.touch()

perms = dict()
for permission in Permission:
perms[permission] = target_path.has_permission(permission)

target_path.chmod(0o600)
target_path.streaming_write(
_call_back(callback, perf_counter() + 1.0, already_written,
size, source_path.streaming_read()))

already_written += source_path.size()

for permission in Permission:
if copy_permissions:
target_path.set_permission(
permission, source_path.has_permission(permission))
else:
target_path.set_permission(
permission,
perms[permission]
and source_path.has_permission(permission))

elif overwrite == 'raise':
raise FileExistsError('Target path exists, not overwriting')

return already_written


def _call_back(callback: Optional[CopyCallback], next_callback: float,
already_written: int, total_size: int, stream: Iterable[bytes]
) -> Generator[bytes, None, None]:
"""Calls the callback every second or so.
"""
written_here = 0
for chunk in stream:
yield chunk
written_here += len(chunk)
if perf_counter() >= next_callback:
if callback is not None:
callback(already_written + written_here, total_size)
next_callback = perf_counter() + 1.0


def _copy_dir(source_path: Path, target_path: Path, overwrite: str,
copy_permissions: bool, context: Optional[Path],
callback: Optional[CopyCallback], already_written: int, size: int
) -> int:
"""Copy a directory recursively.
"""
target_path_exists = target_path.exists() or target_path.is_symlink()

if target_path_exists:
if overwrite == 'always':
if not target_path.is_dir():
target_path.unlink()
# touch with correct mode
target_path.streaming_write(source_path.streaming_read())
elif overwrite == 'raise':
raise FileExistsError('Target path exists, not overwriting')
else:
pass
elif overwrite == 'never':
return already_written

elif source_path.is_dir():
if (overwrite == 'always' and target_path.exists()
and not target_path.is_dir()):
target_path.unlink()
if not target_path.exists():
logging.debug('Making new dir {}'.format(target_path))
target_path.mkdir()

if not target_path.exists():
mode = 0o777 # TODO: read original mode and use
logging.debug('Making new dir {}'.format(target_path))
target_path.mkdir(mode)
perms = dict()
for permission in Permission:
perms[permission] = target_path.has_permission(permission)

for entry in source_path.iterdir():
logging.debug('Recursively copying entry {}'.format(entry))
_copy(entry, target_path / entry.name, overwrite, context)
else:
# We don't copy special entries or broken links
logging.debug(
'Skipping special entry or broken link {}'.format(source_path))
pass
target_path.chmod(0o700)

for entry in source_path.iterdir():
logging.debug('Recursively copying entry {}'.format(entry))
already_written = _copy(entry, target_path / entry.name, overwrite,
copy_permissions, context, callback,
already_written, size)

for permission in Permission:
if copy_permissions:
target_path.set_permission(
permission, source_path.has_permission(permission))
else:
target_path.set_permission(
permission,
perms[permission]
and source_path.has_permission(permission))
return already_written


def _get_approx_size(path: Path) -> int:
count = 0
if not path.is_symlink() and path.is_file():
count += path.size()
elif not path.is_symlink() and path.is_dir():
for subdir in path.iterdir():
count += _get_approx_size(subdir)
return count
Loading

0 comments on commit 0e844fc

Please sign in to comment.