Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
91 changes: 91 additions & 0 deletions storage/google/cloud/storage/blob.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import os
import time
import warnings
import threading

from six.moves.urllib.parse import parse_qsl
from six.moves.urllib.parse import quote
Expand Down Expand Up @@ -100,6 +101,7 @@

_DEFAULT_CHUNKSIZE = 104857600 # 1024 * 1024 B * 100 = 100 MB
_MAX_MULTIPART_SIZE = 8388608 # 8 MB
_MAX_THREAD_LIMIT = 10


class Blob(_PropertyMixin):
Expand Down Expand Up @@ -187,6 +189,11 @@ def __init__(
if generation is not None:
self._properties["generation"] = generation

self._file_count = 0
self._total_files = 0
self._files_list = []
self._lock = threading.Lock()

@property
def chunk_size(self):
"""Get the blob's default chunk size.
Expand Down Expand Up @@ -1950,6 +1957,90 @@ def updated(self):
if value is not None:
return _rfc3339_to_datetime(value)

@staticmethod
def _get_files(path):
"""Get the list of full path of all files in the directory.
:param path: str
:return: The path to the directory.

:rtype: list
:returns: The list of all file path in the directory.
"""
filepath_list = []
for directory, _, filenames in os.walk(path):
for filename in filenames:
filepath_list.append(os.path.join(directory, filename))
return filepath_list

def upload_parallel(
self, path, content_type=None, client=None, predefined_acl=None
):
"""Upload this blob's contents in parallel, from the contents of a file in the directory.

The type of the uploaded content will be determined in the order
of precedence:

- The value passed in to this method (if not :data:`None`)
- The value stored on the current blob
- The value given by ``mimetypes.guess_type``
- The default value ('application/octet-stream')

.. note::
The effect of uploading to an existing blob depends on the
"versioning" and "lifecycle" policies defined on the blob's
bucket. In the absence of those policies, upload will
overwrite any existing contents.

See the `object versioning
<https://cloud.google.com/storage/docs/object-versioning>`_ and
`lifecycle <https://cloud.google.com/storage/docs/lifecycle>`_
API documents for details.

If :attr:`user_project` is set on the bucket, bills the API request
to that project.

:type path: str
:param path: The path to the directory.

:type content_type: str
:param content_type: Optional type of content being uploaded.

:type client: :class:`~google.cloud.storage.client.Client`
:param client: (Optional) The client to use. If not passed, falls back
to the ``client`` stored on the blob's bucket.

:type predefined_acl: str
:param predefined_acl: (Optional) predefined access control list
"""
threads = []
self._files_list = self._get_files(path)
self._total_files = len(self._files_list)
max_thread_to_start = min(self._total_files, _MAX_THREAD_LIMIT)
for _ in range(max_thread_to_start):
thread = threading.Thread(
target=self._upload_from_list,
args=(content_type, client, predefined_acl),
)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This doesn't seem right. Here you are creating multiple threads that essentially try to upload the same set of files. That is, _upload_from_list uses the same files_list over and over again. There should be distribution of individual file uploads among separate threads, not duplicating the uploading procedure.

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have done the changes which doesn't pass files_list and total_files with all threads , but it just passes the files_list with all thread as a argument , but self._files_list[self._file_count] line doesn't allow to upload duplicate files because it increment the count with every upload and took the file from list from particular index.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about racing? The index is updated after the upload. Therefore we cannot eliminate the possibility that while a file is being uploaded by a thread, another thread starts uploading the same file. This can also lead to out-of-range exception.

thread.start()
threads.append(thread)

for thread in threads:
thread.join()

def _upload_from_list(self, content_type, client, predefined_acl):
while self._total_files > self._file_count:
self._lock.acquire()
current_file_index = self._file_count
self._file_count += 1
self._lock.release()
self.upload_from_filename(
self._files_list[current_file_index],
content_type,
client,
predefined_acl,
)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now there is a different kind of a problem. If the upload fails, the counter is still increased. There should be some sort of a retry mechanism to recover from such state, or an index tracking, to handle each file individually.

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think in the Storage module they didn't apply mechanism of retry, that's why they have a different task in git to implement retry mechanism in storage.
Please refer issue [7907].

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Regardless of that, the possibility of racing should be eliminated.

time.sleep(0.3)


def _get_encryption_headers(key, source=False):
"""Builds customer encryption key headers
Expand Down
45 changes: 45 additions & 0 deletions storage/tests/unit/test_blob.py
Original file line number Diff line number Diff line change
Expand Up @@ -1852,6 +1852,51 @@ def test_upload_from_filename(self):
self.assertEqual(stream.mode, "rb")
self.assertEqual(stream.name, temp.name)

def _do_upload_parallel_mock_call_helper(self, blob, client, content_type, size):
self.assertEqual(blob._do_upload.call_count, 10)
mock_call = blob._do_upload.mock_calls[0]
call_name, pos_args, kwargs = mock_call
self.assertEqual(call_name, "")
self.assertEqual(len(pos_args), 6)
self.assertEqual(pos_args[0], client)
self.assertEqual(pos_args[2], content_type)
self.assertEqual(pos_args[3], size)
self.assertIsNone(pos_args[4]) # num_retries
self.assertIsNone(pos_args[5]) # predefined_acl
self.assertEqual(kwargs, {})
return pos_args[1]

def test_upload_parallel(self):
import tempfile
import shutil

google.cloud.storage.blob._MAX_THREAD_LIMIT = 5
blob = self._make_one("blob-name", bucket=None)
# Mock low-level upload helper on blob (it is tested elsewhere).
created_json = {"metadata": {"mint": "ice-cream"}}
blob._do_upload = mock.Mock(return_value=created_json, spec=[])

# Make sure `metadata` is empty before the request.
self.assertIsNone(blob.metadata)
data = b"soooo much data"
content_type = u"image/svg+xml"
client = mock.sentinel.client
temp_dir = tempfile.mkdtemp()
for _ in range(0, 10):
file_handle, name = tempfile.mkstemp(dir=temp_dir)
os.close(file_handle)
with open(name, "wb") as file_obj:
file_obj.write(data)

blob.upload_parallel(temp_dir, content_type, client)
self.assertEqual(blob.metadata, created_json["metadata"])
stream = self._do_upload_parallel_mock_call_helper(
blob, client, content_type, len(data)
)
self.assertTrue(stream.closed)
self.assertEqual(stream.mode, "rb")
shutil.rmtree(temp_dir)

def _upload_from_string_helper(self, data, **kwargs):
from google.cloud._helpers import _to_bytes

Expand Down