Skip to content

Commit

Permalink
multipart: Avoid reading all parts at once before upload (#703)
Browse files Browse the repository at this point in the history
The multipart upload was reading all parts and putting them
in memory before calling for thread manager to upload parts
in parallel.

This commit changes the behavior to read a part only when a
worker becomes ready to upload.
  • Loading branch information
vadmeste authored and kannappanr committed Oct 2, 2018
1 parent b60c1eb commit b0f2b71
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 21 deletions.
12 changes: 5 additions & 7 deletions minio/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -1528,7 +1528,7 @@ def _stream_put_object(self, bucket_name, object_name,

# Instantiate a thread pool with 3 worker threads
pool = ThreadPool(_PARALLEL_UPLOADERS)
parts_to_upload = []
pool.start_parallel()

# Generate new parts and upload <= current_part_size until
# part_number reaches total_parts_count calculated for the
Expand All @@ -1539,13 +1539,11 @@ def _stream_put_object(self, bucket_name, object_name,
else last_part_size)

part_data = read_full(data, current_part_size)
# Append current part information
parts_to_upload.append((bucket_name, object_name, upload_id,
pool.add_task(self._upload_part_routine, (bucket_name, object_name, upload_id,
part_number, part_data, sse))

# Run parts upload in parallel
try:
pool.parallel_run(self._upload_part_routine, parts_to_upload)
upload_result = pool.result()
except:
# Any exception that occurs sends an abort on the
# on-going multipart operation.
Expand All @@ -1556,8 +1554,8 @@ def _stream_put_object(self, bucket_name, object_name,

# Update uploaded_parts with the part uploads result
# and check total uploaded data.
while not pool.result().empty():
part_number, etag, total_read = pool.result().get()
while not upload_result.empty():
part_number, etag, total_read = upload_result.get()
uploaded_parts[part_number] = UploadPart(bucket_name,
object_name,
upload_id,
Expand Down
32 changes: 18 additions & 14 deletions minio/thread_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,20 +42,25 @@ def __init__(self, tasks_queue, results_queue, exceptions_queue):
self.start()

def run(self):
fast_quit = False
while not self.tasks_queue.empty():
func, args, kargs = self.tasks_queue.get()
if not fast_quit:
""" Continously receive tasks and execute them """
while True:
task = self.tasks_queue.get()
if task is None:
self.tasks_queue.task_done()
break
# No exception detected in any thread,
# continue the execution.
if self.exceptions_queue.empty():
try:
# Execute the task
func, args, kargs = task
result = func(*args, **kargs)
self.results_queue.put(result)
except Exception as e:
self.exceptions_queue.put(e)
fast_quit = True
# Mark this task as done, whether an exception happened or not
self.tasks_queue.task_done()


class ThreadPool:
""" Pool of threads consuming tasks from a queue """

Expand All @@ -69,22 +74,21 @@ def add_task(self, func, *args, **kargs):
""" Add a task to the queue """
self.tasks_queue.put((func, args, kargs))

def parallel_run(self, func, args_list):
""" Add a list of tasks to the queue """
for args in args_list:
self.add_task(func, args)

def start_parallel(self):
""" Prepare threads to run tasks"""
for _ in range(self.num_threads):
Worker(self.tasks_queue, self.results_queue, self.exceptions_queue)

def result(self):
""" Stop threads and return the result of all called tasks """
# Send None to all threads to cleanly stop them
for _ in range(self.num_threads):
self.tasks_queue.put(None)
# Wait for completion of all the tasks in the queue
self.tasks_queue.join()
# Check if one of the thread raised an exception, if yes
# raise it here in the function
if not self.exceptions_queue.empty():
raise self.exceptions_queue.get()

def result(self):
""" Return the result of all called tasks """
return self.results_queue

0 comments on commit b0f2b71

Please sign in to comment.