-
Notifications
You must be signed in to change notification settings - Fork 0
add parallel blob object upload/4684 #6
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -33,6 +33,7 @@ | |
| import os | ||
| import time | ||
| import warnings | ||
| import threading | ||
|
|
||
| from six.moves.urllib.parse import parse_qsl | ||
| from six.moves.urllib.parse import quote | ||
|
|
@@ -100,6 +101,7 @@ | |
|
|
||
| _DEFAULT_CHUNKSIZE = 104857600 # 1024 * 1024 B * 100 = 100 MB | ||
| _MAX_MULTIPART_SIZE = 8388608 # 8 MB | ||
| _MAX_THREAD_LIMIT = 10 | ||
|
|
||
|
|
||
| class Blob(_PropertyMixin): | ||
|
|
@@ -187,6 +189,11 @@ def __init__( | |
| if generation is not None: | ||
| self._properties["generation"] = generation | ||
|
|
||
| self._file_count = 0 | ||
| self._total_files = 0 | ||
| self._files_list = [] | ||
| self._lock = threading.Lock() | ||
|
|
||
| @property | ||
| def chunk_size(self): | ||
| """Get the blob's default chunk size. | ||
|
|
@@ -1950,6 +1957,90 @@ def updated(self): | |
| if value is not None: | ||
| return _rfc3339_to_datetime(value) | ||
|
|
||
| @staticmethod | ||
| def _get_files(path): | ||
| """Get the list of full path of all files in the directory. | ||
| :param path: str | ||
| :return: The path to the directory. | ||
|
|
||
| :rtype: list | ||
| :returns: The list of all file path in the directory. | ||
| """ | ||
| filepath_list = [] | ||
| for directory, _, filenames in os.walk(path): | ||
| for filename in filenames: | ||
| filepath_list.append(os.path.join(directory, filename)) | ||
| return filepath_list | ||
|
|
||
| def upload_parallel( | ||
| self, path, content_type=None, client=None, predefined_acl=None | ||
| ): | ||
| """Upload this blob's contents in parallel, from the contents of a file in the directory. | ||
|
|
||
| The type of the uploaded content will be determined in the order | ||
| of precedence: | ||
|
|
||
| - The value passed in to this method (if not :data:`None`) | ||
| - The value stored on the current blob | ||
| - The value given by ``mimetypes.guess_type`` | ||
| - The default value ('application/octet-stream') | ||
|
|
||
| .. note:: | ||
| The effect of uploading to an existing blob depends on the | ||
| "versioning" and "lifecycle" policies defined on the blob's | ||
| bucket. In the absence of those policies, upload will | ||
| overwrite any existing contents. | ||
|
|
||
| See the `object versioning | ||
| <https://cloud.google.com/storage/docs/object-versioning>`_ and | ||
| `lifecycle <https://cloud.google.com/storage/docs/lifecycle>`_ | ||
| API documents for details. | ||
|
|
||
| If :attr:`user_project` is set on the bucket, bills the API request | ||
| to that project. | ||
|
|
||
| :type path: str | ||
| :param path: The path to the directory. | ||
|
|
||
| :type content_type: str | ||
| :param content_type: Optional type of content being uploaded. | ||
|
|
||
| :type client: :class:`~google.cloud.storage.client.Client` | ||
| :param client: (Optional) The client to use. If not passed, falls back | ||
| to the ``client`` stored on the blob's bucket. | ||
|
|
||
| :type predefined_acl: str | ||
| :param predefined_acl: (Optional) predefined access control list | ||
| """ | ||
| threads = [] | ||
| self._files_list = self._get_files(path) | ||
| self._total_files = len(self._files_list) | ||
| max_thread_to_start = min(self._total_files, _MAX_THREAD_LIMIT) | ||
| for _ in range(max_thread_to_start): | ||
| thread = threading.Thread( | ||
| target=self._upload_from_list, | ||
| args=(content_type, client, predefined_acl), | ||
| ) | ||
| thread.start() | ||
| threads.append(thread) | ||
|
|
||
| for thread in threads: | ||
| thread.join() | ||
|
|
||
| def _upload_from_list(self, content_type, client, predefined_acl): | ||
| while self._total_files > self._file_count: | ||
| self._lock.acquire() | ||
| current_file_index = self._file_count | ||
| self._file_count += 1 | ||
| self._lock.release() | ||
| self.upload_from_filename( | ||
| self._files_list[current_file_index], | ||
| content_type, | ||
| client, | ||
| predefined_acl, | ||
| ) | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Now there is a different kind of a problem. If the upload fails, the counter is still increased. There should be some sort of a retry mechanism to recover from such state, or an index tracking, to handle each file individually.
Owner
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think in the Storage module they didn't apply mechanism of retry, that's why they have a different task in git to implement retry mechanism in storage. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Regardless of that, the possibility of racing should be eliminated. |
||
| time.sleep(0.3) | ||
|
|
||
|
|
||
| def _get_encryption_headers(key, source=False): | ||
| """Builds customer encryption key headers | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This doesn't seem right. Here you are creating multiple threads that essentially try to upload the same set of files. That is,
_upload_from_listuses the samefiles_listover and over again. There should be distribution of individual file uploads among separate threads, not duplicating the uploading procedure.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have done the changes which doesn't pass
files_listandtotal_fileswith all threads , but it just passes thefiles_listwith all thread as a argument , butself._files_list[self._file_count]line doesn't allow to upload duplicate files because it increment the count with every upload and took the file from list from particular index.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about racing? The index is updated after the upload. Therefore we cannot eliminate the possibility that while a file is being uploaded by a thread, another thread starts uploading the same file. This can also lead to out-of-range exception.