From fed779d0d2cfd6a81fa5747a0a10debbcdb8c8ec Mon Sep 17 00:00:00 2001 From: Daniel Schiavini Date: Wed, 12 Dec 2018 17:31:08 +0100 Subject: [PATCH] STAR-322: Implementing multi-part upload (doesnt always work) --- .../src/Cloud/CloudApiClient.py | 23 +---- .../src/Cloud/CloudOutputDevice.py | 56 +++-------- .../src/Cloud/CloudProgressMessage.py | 37 ++++++++ .../src/Cloud/ResumableUpload.py | 94 +++++++++++++++++++ .../tests/Cloud/NetworkManagerMock.py | 3 +- .../tests/Cloud/TestCloudApiClient.py | 15 +-- 6 files changed, 161 insertions(+), 67 deletions(-) create mode 100644 plugins/UM3NetworkPrinting/src/Cloud/CloudProgressMessage.py create mode 100644 plugins/UM3NetworkPrinting/src/Cloud/ResumableUpload.py diff --git a/plugins/UM3NetworkPrinting/src/Cloud/CloudApiClient.py b/plugins/UM3NetworkPrinting/src/Cloud/CloudApiClient.py index b08bac6670..2637f17010 100644 --- a/plugins/UM3NetworkPrinting/src/Cloud/CloudApiClient.py +++ b/plugins/UM3NetworkPrinting/src/Cloud/CloudApiClient.py @@ -9,6 +9,7 @@ from PyQt5.QtNetwork import QNetworkRequest, QNetworkReply from UM.Logger import Logger from cura.API import Account from cura.NetworkClient import NetworkClient +from .ResumableUpload import ResumableUpload from ..Models import BaseModel from .Models.CloudClusterResponse import CloudClusterResponse from .Models.CloudErrorObject import CloudErrorObject @@ -69,24 +70,10 @@ class CloudApiClient(NetworkClient): # \param on_finished: The function to be called after the result is parsed. It receives the print job ID. # \param on_progress: A function to be called during upload progress. It receives a percentage (0-100). # \param on_error: A function to be called if the upload fails. It receives a dict with the error. - def uploadMesh(self, upload_response: CloudPrintJobResponse, mesh: bytes, on_finished: Callable[[str], Any], - on_progress: Callable[[int], Any], on_error: Callable[[dict], Any]): - - def progressCallback(bytes_sent: int, bytes_total: int) -> None: - if bytes_total: - on_progress(int((bytes_sent / bytes_total) * 100)) - - def finishedCallback(reply: QNetworkReply): - status_code, response = self._parseReply(reply) - if status_code < 300: - on_finished(upload_response.job_id) - else: - Logger.log("e", "Received unexpected response %s uploading mesh: %s", status_code, response) - on_error(response) - - # TODO: Multipart upload - self.put(upload_response.upload_url, data = mesh, content_type = upload_response.content_type, - on_finished = finishedCallback, on_progress = progressCallback) + def uploadMesh(self, upload_response: CloudPrintJobResponse, mesh: bytes, on_finished: Callable[[], Any], + on_progress: Callable[[int], Any], on_error: Callable[[], Any]): + ResumableUpload(upload_response.upload_url, upload_response.content_type, mesh, on_finished, + on_progress, on_error).start() # Requests a cluster to print the given print job. # \param cluster_id: The ID of the cluster. diff --git a/plugins/UM3NetworkPrinting/src/Cloud/CloudOutputDevice.py b/plugins/UM3NetworkPrinting/src/Cloud/CloudOutputDevice.py index c4ab752163..e75989a6a8 100644 --- a/plugins/UM3NetworkPrinting/src/Cloud/CloudOutputDevice.py +++ b/plugins/UM3NetworkPrinting/src/Cloud/CloudOutputDevice.py @@ -18,6 +18,7 @@ from cura.PrinterOutput.PrinterOutputModel import PrinterOutputModel from plugins.UM3NetworkPrinting.src.Cloud.CloudOutputController import CloudOutputController from ..MeshFormatHandler import MeshFormatHandler from ..UM3PrintJobOutputModel import UM3PrintJobOutputModel +from .CloudProgressMessage import CloudProgressMessage from .CloudApiClient import CloudApiClient from .Models.CloudClusterStatus import CloudClusterStatus from .Models.CloudPrintJobUploadRequest import CloudPrintJobUploadRequest @@ -43,9 +44,6 @@ class T: COULD_NOT_EXPORT = _I18N_CATALOG.i18nc("@info:status", "Could not export print job.") - SENDING_DATA_TEXT = _I18N_CATALOG.i18nc("@info:status", "Sending data to remote cluster") - SENDING_DATA_TITLE = _I18N_CATALOG.i18nc("@info:status", "Sending data to remote cluster") - ERROR = _I18N_CATALOG.i18nc("@info:title", "Error") UPLOAD_ERROR = _I18N_CATALOG.i18nc("@info:text", "Could not upload the data to the printer.") @@ -68,7 +66,7 @@ class T: class CloudOutputDevice(NetworkedPrinterOutputDevice): # The interval with which the remote clusters are checked - CHECK_CLUSTER_INTERVAL = 4.0 # seconds + CHECK_CLUSTER_INTERVAL = 5.0 # seconds # Signal triggered when the print jobs in the queue were changed. printJobsChanged = pyqtSignal() @@ -109,9 +107,7 @@ class CloudOutputDevice(NetworkedPrinterOutputDevice): self._number_of_extruders = 2 # All networked printers are dual-extrusion Ultimaker machines. # We only allow a single upload at a time. - self._sending_job = False - # TODO: handle progress messages in another class. - self._progress_message = None # type: Optional[Message] + self._progress = CloudProgressMessage() # Keep server string of the last generated time to avoid updating models more than once for the same response self._received_printers = None # type: Optional[List[CloudClusterPrinterStatus]] @@ -149,7 +145,7 @@ class CloudOutputDevice(NetworkedPrinterOutputDevice): file_handler: Optional[FileHandler] = None, **kwargs: str) -> None: # Show an error message if we're already sending a job. - if self._sending_job: + if self._progress.visible: self._onUploadError(T.BLOCKED_UPLOADING) return @@ -286,53 +282,31 @@ class CloudOutputDevice(NetworkedPrinterOutputDevice): # \param mesh: The bytes to upload. # \param job_response: The response received from the cloud API. def _onPrintJobCreated(self, mesh: bytes, job_response: CloudPrintJobResponse) -> None: - self._api.uploadMesh(job_response, mesh, self._onPrintJobUploaded, self._updateUploadProgress, - lambda _: self._onUploadError(T.UPLOAD_ERROR)) + self._progress.show() + self._api.uploadMesh(job_response, mesh, lambda: self._onPrintJobUploaded(job_response.job_id), + self._progress.update, self._onUploadError) ## Requests the print to be sent to the printer when we finished uploading the mesh. # \param job_id: The ID of the job. def _onPrintJobUploaded(self, job_id: str) -> None: self._api.requestPrint(self._device_id, job_id, self._onUploadSuccess) - ## Updates the progress of the mesh upload. - # \param progress: The amount of percentage points uploaded until now (0-100). - def _updateUploadProgress(self, progress: int) -> None: - if not self._progress_message: - self._progress_message = Message( - text = T.SENDING_DATA_TEXT, - title = T.SENDING_DATA_TITLE, - progress = -1, - lifetime = 0, - dismissable = False, - use_inactivity_timer = False - ) - self._progress_message.setProgress(progress) - self._progress_message.show() - - ## Hides the upload progress bar - def _resetUploadProgress(self) -> None: - if self._progress_message: - self._progress_message.hide() - self._progress_message = None - ## Displays the given message if uploading the mesh has failed # \param message: The message to display. - def _onUploadError(self, message: str = None) -> None: - self._resetUploadProgress() - if message: - Message( - text = message, - title = T.ERROR, - lifetime = 10 - ).show() - self._sending_job = False # the upload has finished so we're not sending a job anymore + def _onUploadError(self, message = None) -> None: + self._progress.hide() + Message( + text = message or T.UPLOAD_ERROR, + title = T.ERROR, + lifetime = 10 + ).show() self.writeError.emit() ## Shows a message when the upload has succeeded # \param response: The response from the cloud API. def _onUploadSuccess(self, response: CloudPrintResponse) -> None: Logger.log("i", "The cluster will be printing this print job with the ID %s", response.cluster_job_id) - self._resetUploadProgress() + self._progress.hide() Message( text = T.UPLOAD_SUCCESS_TEXT, title = T.UPLOAD_SUCCESS_TITLE, diff --git a/plugins/UM3NetworkPrinting/src/Cloud/CloudProgressMessage.py b/plugins/UM3NetworkPrinting/src/Cloud/CloudProgressMessage.py new file mode 100644 index 0000000000..e3e0cefc0c --- /dev/null +++ b/plugins/UM3NetworkPrinting/src/Cloud/CloudProgressMessage.py @@ -0,0 +1,37 @@ +# Copyright (c) 2018 Ultimaker B.V. +# Cura is released under the terms of the LGPLv3 or higher. +from UM import i18nCatalog +from UM.Message import Message + + +## Class that contains all the translations for this module. +class T: + _I18N_CATALOG = i18nCatalog("cura") + + SENDING_DATA_TEXT = _I18N_CATALOG.i18nc("@info:status", "Sending data to remote cluster") + SENDING_DATA_TITLE = _I18N_CATALOG.i18nc("@info:status", "Sending data to remote cluster") + + +class CloudProgressMessage(Message): + def __init__(self): + super().__init__( + text = T.SENDING_DATA_TEXT, + title = T.SENDING_DATA_TITLE, + progress = -1, + lifetime = 0, + dismissable = False, + use_inactivity_timer = False + ) + + def show(self): + self.setProgress(0) + super().show() + + def update(self, percentage: int) -> None: + if not self._visible: + super().show() + self.setProgress(percentage) + + @property + def visible(self) -> bool: + return self._visible diff --git a/plugins/UM3NetworkPrinting/src/Cloud/ResumableUpload.py b/plugins/UM3NetworkPrinting/src/Cloud/ResumableUpload.py new file mode 100644 index 0000000000..52b8e5c2d7 --- /dev/null +++ b/plugins/UM3NetworkPrinting/src/Cloud/ResumableUpload.py @@ -0,0 +1,94 @@ +# Copyright (c) 2018 Ultimaker B.V. +# !/usr/bin/env python +# -*- coding: utf-8 -*- +from PyQt5.QtNetwork import QNetworkRequest, QNetworkReply +from typing import Optional, Callable, Any, Tuple + +from UM.Logger import Logger +from cura.NetworkClient import NetworkClient + + +class ResumableUpload(NetworkClient): + MAX_RETRIES = 10 + BYTES_PER_REQUEST = 256 * 1024 + RETRY_HTTP_CODES = {500, 502, 503, 504} + + ## Creates a resumable upload + # \param url: The URL to which we shall upload. + # \param content_length: The total content length of the file, in bytes. + # \param http_method: The HTTP method to be used, e.g. "POST" or "PUT". + # \param timeout: The timeout for each chunk upload. Important: If None, no timeout is applied at all. + def __init__(self, url: str, content_type: str, data: bytes, + on_finished: Callable[[], Any], on_progress: Callable[[int], Any], on_error: Callable[[], Any]): + super().__init__() + self._url = url + self._content_type = content_type + self._data = data + + self._on_finished = on_finished + self._on_progress = on_progress + self._on_error = on_error + + self._sent_bytes = 0 + self._retries = 0 + self._finished = False + + ## We override _createEmptyRequest in order to add the user credentials. + # \param url: The URL to request + # \param content_type: The type of the body contents. + def _createEmptyRequest(self, path: str, content_type: Optional[str] = "application/json") -> QNetworkRequest: + request = super()._createEmptyRequest(path, content_type = self._content_type) + + first_byte, last_byte = self._chunkRange() + content_range = "bytes {}-{}/{}".format(first_byte, last_byte - 1, len(self._data)) + request.setRawHeader(b"Content-Range", content_range.encode()) + Logger.log("i", "Uploading %s to %s", content_range, self._url) + + return request + + def _chunkRange(self) -> Tuple[int, int]: + last_byte = min(len(self._data), self._sent_bytes + self.BYTES_PER_REQUEST) + return self._sent_bytes, last_byte + + def start(self) -> None: + self._uploadChunk() + + def _uploadChunk(self) -> None: + if self._finished: + raise ValueError("The upload is already finished") + + first_byte, last_byte = self._chunkRange() + Logger.log("i", "PUT %s - %s", first_byte, last_byte) + self.put(self._url, data = self._data[first_byte:last_byte], content_type = self._content_type, + on_finished = self.finishedCallback, on_progress = self.progressCallback) + + def progressCallback(self, bytes_sent: int, bytes_total: int) -> None: + if bytes_total: + self._on_progress(int((self._sent_bytes + bytes_sent) / len(self._data) * 100)) + + def finishedCallback(self, reply: QNetworkReply) -> None: + status_code = reply.attribute(QNetworkRequest.HttpStatusCodeAttribute) + + if self._retries < self.MAX_RETRIES and status_code in self.RETRY_HTTP_CODES: + self._retries += 1 + Logger.log("i", "Retrying %s/%s request %s", tries, self.MAX_RETRIES, request.url) + self._uploadChunk() + return + + body = bytes(reply.readAll()).decode() + Logger.log("w", "status_code: %s, Headers: %s, body: %s", status_code, + [bytes(header).decode() for header in reply.rawHeaderList()], body) + + if status_code > 308: + self._finished = True + Logger.log("e", "Received error while uploading: %s", body) + self._on_error() + return + + first_byte, last_byte = self._chunkRange() + self._sent_bytes += last_byte - first_byte + self._finished = self._sent_bytes >= len(self._data) + if self._finished: + self._on_finished() + else: + self._uploadChunk() diff --git a/plugins/UM3NetworkPrinting/tests/Cloud/NetworkManagerMock.py b/plugins/UM3NetworkPrinting/tests/Cloud/NetworkManagerMock.py index 94cc239c0a..60627cbe7c 100644 --- a/plugins/UM3NetworkPrinting/tests/Cloud/NetworkManagerMock.py +++ b/plugins/UM3NetworkPrinting/tests/Cloud/NetworkManagerMock.py @@ -76,7 +76,8 @@ class NetworkManagerMock: ## Emits the signal that the reply is ready to all prepared replies. def flushReplies(self) -> None: - for reply in self.replies.values(): + for key, reply in self.replies.items(): + Logger.log("i", "Flushing reply to {} {}", *key) self.finished.emit(reply) self.reset() diff --git a/plugins/UM3NetworkPrinting/tests/Cloud/TestCloudApiClient.py b/plugins/UM3NetworkPrinting/tests/Cloud/TestCloudApiClient.py index d673554640..e377627465 100644 --- a/plugins/UM3NetworkPrinting/tests/Cloud/TestCloudApiClient.py +++ b/plugins/UM3NetworkPrinting/tests/Cloud/TestCloudApiClient.py @@ -89,16 +89,17 @@ class TestCloudApiClient(TestCase): data = parseFixture("putJobUploadResponse")["data"] upload_response = CloudPrintJobResponse(**data) - self.network.prepareReply("PUT", upload_response.upload_url, 200, - b'{ data : "" }') # Network client doesn't look into the reply + # Network client doesn't look into the reply + self.network.prepareReply("PUT", upload_response.upload_url, 200, b'{}') - self.api.uploadMesh(upload_response, b'', lambda job_id: results.append(job_id), - progress.advance, progress.error) + mesh = ("1234" * 100000).encode() + self.api.uploadMesh(upload_response, mesh, lambda: results.append("sent"), progress.advance, progress.error) - self.network.flushReplies() + for _ in range(10): + self.network.flushReplies() + self.network.prepareReply("PUT", upload_response.upload_url, 200, b'{}') - self.assertEqual(len(results), 1) - self.assertEqual(results[0], upload_response.job_id) + self.assertEqual(["sent"], results) def test_requestPrint(self, network_mock): network_mock.return_value = self.network