Skip to content

Commit

Permalink
Add configurable part_size (#745)
Browse files Browse the repository at this point in the history
  • Loading branch information
harshavardhana authored and kannappanr committed Mar 27, 2019
1 parent 7c71b73 commit 9bf406c
Show file tree
Hide file tree
Showing 6 changed files with 91 additions and 67 deletions.
88 changes: 45 additions & 43 deletions docs/API.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,18 +28,18 @@ s3Client = Minio('s3.amazonaws.com',



|Bucket operations | Object operations| Presigned operations | Bucket policy/notification operations
|:---|:---|:---|:---|
| [`make_bucket`](#make_bucket) | [`get_object`](#get_object) | [`presigned_get_object`](#presigned_get_object) | [`get_bucket_policy`](#get_bucket_policy) |
| [`list_buckets`](#list_buckets) | [`put_object`](#put_object) | [`presigned_put_object`](#presigned_put_object) | [`set_bucket_policy`](#set_bucket_policy) |
| [`bucket_exists`](#bucket_exists) | [`copy_object`](#copy_object) | [`presigned_post_policy`](#presigned_post_policy) | [`get_bucket_notification`](#get_bucket_notification) |
| [`remove_bucket`](#remove_bucket) | [`stat_object`](#stat_object) | | [`set_bucket_notification`](#set_bucket_notification) |
| [`list_objects`](#list_objects) | [`remove_object`](#remove_object) | | [`remove_all_bucket_notification`](#remove_all_bucket_notification) |
| [`list_objects_v2`](#list_objects_v2) | [`remove_objects`](#remove_objects) | | [`listen_bucket_notification`](#listen_bucket_notification) |
| [`list_incomplete_uploads`](#list_incomplete_uploads) | [`remove_incomplete_upload`](#remove_incomplete_upload) | | |
| | [`fput_object`](#fput_object) | | |
| | [`fget_object`](#fget_object) | | |
| | [`get_partial_object`](#get_partial_object) | | |
| Bucket operations | Object operations | Presigned operations | Bucket policy/notification operations |
|:------------------------------------------------------|:--------------------------------------------------------|:--------------------------------------------------|:--------------------------------------------------------------------|
| [`make_bucket`](#make_bucket) | [`get_object`](#get_object) | [`presigned_get_object`](#presigned_get_object) | [`get_bucket_policy`](#get_bucket_policy) |
| [`list_buckets`](#list_buckets) | [`put_object`](#put_object) | [`presigned_put_object`](#presigned_put_object) | [`set_bucket_policy`](#set_bucket_policy) |
| [`bucket_exists`](#bucket_exists) | [`copy_object`](#copy_object) | [`presigned_post_policy`](#presigned_post_policy) | [`get_bucket_notification`](#get_bucket_notification) |
| [`remove_bucket`](#remove_bucket) | [`stat_object`](#stat_object) | | [`set_bucket_notification`](#set_bucket_notification) |
| [`list_objects`](#list_objects) | [`remove_object`](#remove_object) | | [`remove_all_bucket_notification`](#remove_all_bucket_notification) |
| [`list_objects_v2`](#list_objects_v2) | [`remove_objects`](#remove_objects) | | [`listen_bucket_notification`](#listen_bucket_notification) |
| [`list_incomplete_uploads`](#list_incomplete_uploads) | [`remove_incomplete_upload`](#remove_incomplete_upload) | | |
| | [`fput_object`](#fput_object) | | |
| | [`fget_object`](#fget_object) | | |
| | [`get_partial_object`](#get_partial_object) | | |

## 1. Constructor

Expand Down Expand Up @@ -474,7 +474,7 @@ There is no return value. If there are errors from the target
server/service, a `ResponseError` is thrown. If there are validation
errors, `InvalidArgumentError` or `TypeError` may be thrown. The input
configuration cannot be empty - to delete the notification
configuration on a bucket, use the `remove_all_bucket_notifications()`
configuration on a bucket, use the `remove_all_bucket_notification()`
API.

__Example__
Expand Down Expand Up @@ -544,8 +544,8 @@ except (ArgumentError, TypeError) as err:
print(err)
```

<a name="remove_all_bucket_notifications"></a>
### remove_all_bucket_notifications(bucket_name)
<a name="remove_all_bucket_notification"></a>
### remove_all_bucket_notification(bucket_name)

Remove all notifications configured on the bucket.

Expand All @@ -563,7 +563,7 @@ __Example__

```py
# Remove all the notifications config for a bucket.
minioClient.remove_all_bucket_notifications('mybucket')
minioClient.remove_all_bucket_notification('mybucket')
```

<a name="listen_bucket_notification"></a>
Expand Down Expand Up @@ -763,29 +763,30 @@ except ResponseError as err:
```

<a name="put_object"></a>
### put_object(bucket_name, object_name, data, length, content_type='application/octet-stream', metadata=None)
### put_object(bucket_name, object_name, data, length, content_type='application/octet-stream', metadata=None, progress=None, part_size=5*1024*1024)
Add a new object to the object storage server. If provided metadata key is not one of the valid/supported metadata names, the metadata information is saved with prefix `X-Amz-Meta-` prepended to the original metadata key name.

NOTE: Maximum object size supported by this API is 5TiB.

__Parameters__

|Param |Type |Description |
|:---|:---|:---|
|``bucket_name`` |_string_ |Name of the bucket. |
|``object_name`` |_string_ |Name of the object. |
|``data`` |_io.RawIOBase_ |Any python object implementing io.RawIOBase. |
|``length`` |_int_ |Total length of object. |
|``content_type`` |_string_ | Content type of the object. (optional, defaults to 'application/octet-stream'). |
|``metadata`` |_dict_ | Any additional metadata. (optional, defaults to None). |
|``sse`` |_dict_ |Server-Side Encryption headers (optional, defaults to None). |
|``progress`` |_subclass_of_threading_ |A progress object (optional, defaults to None). |
| Param | Type | Description |
|:-----------------|:------------------------|:--------------------------------------------------------------------------------|
| ``bucket_name`` | _string_ | Name of the bucket. |
| ``object_name`` | _string_ | Name of the object. |
| ``data`` | _io.RawIOBase_ | Any python object implementing io.RawIOBase. |
| ``length`` | _int_ | Total length of object. |
| ``content_type`` | _string_ | Content type of the object. (optional, defaults to 'application/octet-stream'). |
| ``metadata`` | _dict_ | Any additional metadata. (optional, defaults to None). |
| ``sse`` | _dict_ | Server-Side Encryption headers (optional, defaults to None). |
| ``progress`` | _subclass_of_threading_ | A progress object (optional, defaults to None). |
| ``part_size`` | _int_ | Multipart part size. |

__Return Value__

|Param |Type |Description |
|:---|:---|:---|
|``etag``|_string_ |Object etag computed by the server. |
| Param | Type | Description |
|:---------|:---------|:------------------------------------|
| ``etag`` | _string_ | Object etag computed by the server. |

__Example__

Expand Down Expand Up @@ -814,26 +815,27 @@ except ResponseError as err:
```

<a name="fput_object"></a>
### fput_object(bucket_name, object_name, file_path, content_type='application/octet-stream', metadata=None)
### fput_object(bucket_name, object_name, file_path, content_type='application/octet-stream', metadata=None, progress=None, part_size=5*1024*1024)
Uploads contents from a file, `file_path`, to `object_name`. If provided metadata key is not one of the valid/supported metadata names, the metadata information is saved with prefix `X-Amz-Meta-` prepended to the original metadata key name.

__Parameters__

|Param |Type |Description |
|:---|:---|:---|
|``bucket_name`` |_string_ |Name of the bucket. |
|``object_name`` |_string_ |Name of the object. |
|``file_path`` |_string_ |Path on the local filesystem from which object data will be read. |
|``content_type`` |_string_ | Content type of the object (optional, defaults to 'application/octet-stream'). |
|``metadata`` |_dict_ | Any additional metadata (optional, defaults to None). |
|``sse`` |_dict_ |Server-Side Encryption headers (optional, defaults to None). |
|``progress`` |_subclass_of_threading_ |A progress object (optional, defaults to None). |
| Param | Type | Description |
|:-----------------|:------------------------|:-------------------------------------------------------------------------------|
| ``bucket_name`` | _string_ | Name of the bucket. |
| ``object_name`` | _string_ | Name of the object. |
| ``file_path`` | _string_ | Path on the local filesystem from which object data will be read. |
| ``content_type`` | _string_ | Content type of the object (optional, defaults to 'application/octet-stream'). |
| ``metadata`` | _dict_ | Any additional metadata (optional, defaults to None). |
| ``sse`` | _dict_ | Server-Side Encryption headers (optional, defaults to None). |
| ``progress`` | _subclass_of_threading_ | A progress object (optional, defaults to None). |
| ``part_size`` | _int_ | Multipart part size. |

__Return Value__

|Param |Type |Description |
|:---|:---|:---|
|``etag``|_string_ |Object etag computed by the server. |
| Param | Type | Description |
|:---------|:---------|:------------------------------------|
| ``etag`` | _string_ | Object etag computed by the server. |

__Example__

Expand Down
8 changes: 4 additions & 4 deletions docs/zh_CN/API.md
Original file line number Diff line number Diff line change
Expand Up @@ -439,7 +439,7 @@ notification = minioClient.get_bucket_notification('mybucket')
* __Value__ (string) -- 指定规则适用的值。


没有返回值。如果目标服务报错,会抛出`ResponseError`。如果有验证错误,会抛出`InvalidArgumentError`或者`TypeError`。输入参数的configuration不能为空 - 为了删除存储桶上的通知配置,参考`remove_all_bucket_notifications()` API。
没有返回值。如果目标服务报错,会抛出`ResponseError`。如果有验证错误,会抛出`InvalidArgumentError`或者`TypeError`。输入参数的configuration不能为空 - 为了删除存储桶上的通知配置,参考`remove_all_bucket_notification()` API。

__示例__

Expand Down Expand Up @@ -508,8 +508,8 @@ except (ArgumentError, TypeError) as err:
print(err)
```

<a name="remove_all_bucket_notifications"></a>
### remove_all_bucket_notifications(bucket_name)
<a name="remove_all_bucket_notification"></a>
### remove_all_bucket_notification(bucket_name)

删除存储桶上配置的所有通知。

Expand All @@ -526,7 +526,7 @@ __示例__

```py
# Remove all the notifications config for a bucket.
minioClient.remove_all_bucket_notifications('mybucket')
minioClient.remove_all_bucket_notification('mybucket')
```

<a name="listen_bucket_notification"></a>
Expand Down
2 changes: 1 addition & 1 deletion examples/remove_all_bucket_notification.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,6 @@

try:
# Remove all notification config for a bucket.
client.remove_all_bucket_notifications('my-bucketname')
client.remove_all_bucket_notification('my-bucketname')
except ResponseError as err:
print(err)
42 changes: 31 additions & 11 deletions minio/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,8 +78,11 @@
mkdir_p, dump_http, amzprefix_user_metadata,
is_supported_header,is_amz_header)
from .helpers import (MAX_MULTIPART_OBJECT_SIZE,
MAX_PART_SIZE,
MAX_POOL_SIZE,
MIN_PART_SIZE)
MIN_PART_SIZE,
DEFAULT_PART_SIZE,
MAX_MULTIPART_COUNT)
from .signer import (sign_v4, presign_v4,
generate_credential_string,
post_presign_signature)
Expand Down Expand Up @@ -541,7 +544,8 @@ def listen_bucket_notification(self, bucket_name, prefix='', suffix='',

def fput_object(self, bucket_name, object_name, file_path,
content_type='application/octet-stream',
metadata=None, sse=None, progress=None):
metadata=None, sse=None, progress=None,
part_size=DEFAULT_PART_SIZE):
"""
Add a new object to the cloud storage server.
Expand All @@ -555,14 +559,16 @@ def fput_object(self, bucket_name, object_name, file_path,
:param metadata: Any additional metadata to be uploaded along
with your PUT request.
:param progress: A progress object
:param part_size: Multipart part size
:return: etag
"""

# Open file in 'read' mode.
with open(file_path, 'rb') as file_data:
file_size = os.stat(file_path).st_size
return self.put_object(bucket_name, object_name, file_data,
file_size, content_type, metadata, sse, progress)
file_size, content_type, metadata, sse,
progress, part_size)

def fget_object(self, bucket_name, object_name, file_path, request_headers=None, sse=None):
"""
Expand Down Expand Up @@ -751,7 +757,8 @@ def copy_object(self, bucket_name, object_name, object_source,

def put_object(self, bucket_name, object_name, data, length,
content_type='application/octet-stream',
metadata=None, sse=None, progress=None):
metadata=None, sse=None, progress=None,
part_size=DEFAULT_PART_SIZE):
"""
Add a new object to the cloud storage server.
Expand All @@ -775,6 +782,7 @@ def put_object(self, bucket_name, object_name, data, length,
:param metadata: Any additional metadata to be uploaded along
with your PUT request.
:param progress: A progress object
:param part_size: Multipart part size
:return: etag
"""

Expand All @@ -792,9 +800,18 @@ def put_object(self, bucket_name, object_name, data, length,
raise ValueError(
'Invalid input data does not implement a callable read() method')

if length > MAX_MULTIPART_OBJECT_SIZE:
raise InvalidArgumentError('Input content size is bigger '
' than allowed maximum of 5TiB.')
if length > (part_size * MAX_MULTIPART_COUNT):
raise InvalidArgumentError('Part size * max_parts(10000) is '
' lesser than input length.')

if part_size < MIN_PART_SIZE:
raise InvalidArgumentError('Input part size is smaller '
' than allowed minimum of 5MiB.')

if part_size > MAX_PART_SIZE:
raise InvalidArgumentError('Input part size is bigger '
' than allowed maximum of 5GiB.')

if not metadata:
metadata = {}

Expand All @@ -803,10 +820,11 @@ def put_object(self, bucket_name, object_name, data, length,
metadata['Content-Type'] = 'application/octet-stream' if \
not content_type else content_type

if length > MIN_PART_SIZE:
if length > part_size:
return self._stream_put_object(bucket_name, object_name,
data, length, metadata=metadata,
sse=sse, progress=progress)
sse=sse, progress=progress,
part_size=part_size)

current_data = data.read(length)
if len(current_data) != length:
Expand Down Expand Up @@ -1550,7 +1568,8 @@ def _upload_part_routine(self, part_info):

def _stream_put_object(self, bucket_name, object_name,
data, content_size,
metadata=None, sse=None, progress=None):
metadata=None, sse=None,
progress=None, part_size=MIN_PART_SIZE):
"""
Streaming multipart upload operation.
Expand All @@ -1562,6 +1581,7 @@ def _stream_put_object(self, bucket_name, object_name,
:param metadata: Any additional metadata to be uploaded along
with your object.
:param progress: A progress object
:param part_size: Multipart part size
"""
is_valid_bucket_name(bucket_name)
is_non_empty_string(object_name)
Expand All @@ -1579,7 +1599,7 @@ def _stream_put_object(self, bucket_name, object_name,

# Calculate optimal part info.
total_parts_count, part_size, last_part_size = optimal_part_info(
content_size)
content_size, part_size)

# Instantiate a thread pool with 3 worker threads
pool = ThreadPool(_PARALLEL_UPLOADERS)
Expand Down
8 changes: 5 additions & 3 deletions minio/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,10 @@
# Constants
MAX_MULTIPART_COUNT = 10000 # 10000 parts
MAX_MULTIPART_OBJECT_SIZE = 5 * 1024 * 1024 * 1024 * 1024 # 5TiB
MAX_PART_SIZE = 5 * 1024 * 1024 * 1024 # 5GiB
MAX_POOL_SIZE = 10
MIN_PART_SIZE = 5 * 1024 * 1024 # 5MiB
DEFAULT_PART_SIZE = MIN_PART_SIZE # Currently its 5MiB

_VALID_BUCKETNAME_REGEX = re.compile('^[a-z0-9][a-z0-9\\.\\-]+[a-z0-9]$')
_ALLOWED_HOSTNAME_REGEX = re.compile(
Expand Down Expand Up @@ -616,7 +618,7 @@ def get_md5_base64digest(content):
return Hasher.md5(content).base64digest()


def optimal_part_info(length):
def optimal_part_info(length, part_size):
"""
Calculate optimal part size for multipart uploads.
Expand All @@ -633,8 +635,8 @@ def optimal_part_info(length):
# Use floats for part size for all calculations to avoid
# overflows during float64 to int64 conversions.
part_size_float = math.ceil(length/MAX_MULTIPART_COUNT)
part_size_float = (math.ceil(part_size_float/MIN_PART_SIZE)
* MIN_PART_SIZE)
part_size_float = (math.ceil(part_size_float/part_size)
* part_size)
# Total parts count.
total_parts_count = int(math.ceil(length/part_size_float))
# Part size.
Expand Down
Loading

0 comments on commit 9bf406c

Please sign in to comment.