STAR-322: Implementing multi-part upload (doesnt always work)

This commit is contained in:
Daniel Schiavini 2018-12-12 17:31:08 +01:00
parent 0467756ed6
commit fed779d0d2
6 changed files with 161 additions and 67 deletions

View File

@ -9,6 +9,7 @@ from PyQt5.QtNetwork import QNetworkRequest, QNetworkReply
from UM.Logger import Logger
from cura.API import Account
from cura.NetworkClient import NetworkClient
from .ResumableUpload import ResumableUpload
from ..Models import BaseModel
from .Models.CloudClusterResponse import CloudClusterResponse
from .Models.CloudErrorObject import CloudErrorObject
@ -69,24 +70,10 @@ class CloudApiClient(NetworkClient):
# \param on_finished: The function to be called after the result is parsed. It receives the print job ID.
# \param on_progress: A function to be called during upload progress. It receives a percentage (0-100).
# \param on_error: A function to be called if the upload fails. It receives a dict with the error.
def uploadMesh(self, upload_response: CloudPrintJobResponse, mesh: bytes, on_finished: Callable[[str], Any],
on_progress: Callable[[int], Any], on_error: Callable[[dict], Any]):
def progressCallback(bytes_sent: int, bytes_total: int) -> None:
if bytes_total:
on_progress(int((bytes_sent / bytes_total) * 100))
def finishedCallback(reply: QNetworkReply):
status_code, response = self._parseReply(reply)
if status_code < 300:
on_finished(upload_response.job_id)
else:
Logger.log("e", "Received unexpected response %s uploading mesh: %s", status_code, response)
on_error(response)
# TODO: Multipart upload
self.put(upload_response.upload_url, data = mesh, content_type = upload_response.content_type,
on_finished = finishedCallback, on_progress = progressCallback)
def uploadMesh(self, upload_response: CloudPrintJobResponse, mesh: bytes, on_finished: Callable[[], Any],
on_progress: Callable[[int], Any], on_error: Callable[[], Any]):
ResumableUpload(upload_response.upload_url, upload_response.content_type, mesh, on_finished,
on_progress, on_error).start()
# Requests a cluster to print the given print job.
# \param cluster_id: The ID of the cluster.

View File

@ -18,6 +18,7 @@ from cura.PrinterOutput.PrinterOutputModel import PrinterOutputModel
from plugins.UM3NetworkPrinting.src.Cloud.CloudOutputController import CloudOutputController
from ..MeshFormatHandler import MeshFormatHandler
from ..UM3PrintJobOutputModel import UM3PrintJobOutputModel
from .CloudProgressMessage import CloudProgressMessage
from .CloudApiClient import CloudApiClient
from .Models.CloudClusterStatus import CloudClusterStatus
from .Models.CloudPrintJobUploadRequest import CloudPrintJobUploadRequest
@ -43,9 +44,6 @@ class T:
COULD_NOT_EXPORT = _I18N_CATALOG.i18nc("@info:status", "Could not export print job.")
SENDING_DATA_TEXT = _I18N_CATALOG.i18nc("@info:status", "Sending data to remote cluster")
SENDING_DATA_TITLE = _I18N_CATALOG.i18nc("@info:status", "Sending data to remote cluster")
ERROR = _I18N_CATALOG.i18nc("@info:title", "Error")
UPLOAD_ERROR = _I18N_CATALOG.i18nc("@info:text", "Could not upload the data to the printer.")
@ -68,7 +66,7 @@ class T:
class CloudOutputDevice(NetworkedPrinterOutputDevice):
# The interval with which the remote clusters are checked
CHECK_CLUSTER_INTERVAL = 4.0 # seconds
CHECK_CLUSTER_INTERVAL = 5.0 # seconds
# Signal triggered when the print jobs in the queue were changed.
printJobsChanged = pyqtSignal()
@ -109,9 +107,7 @@ class CloudOutputDevice(NetworkedPrinterOutputDevice):
self._number_of_extruders = 2 # All networked printers are dual-extrusion Ultimaker machines.
# We only allow a single upload at a time.
self._sending_job = False
# TODO: handle progress messages in another class.
self._progress_message = None # type: Optional[Message]
self._progress = CloudProgressMessage()
# Keep server string of the last generated time to avoid updating models more than once for the same response
self._received_printers = None # type: Optional[List[CloudClusterPrinterStatus]]
@ -149,7 +145,7 @@ class CloudOutputDevice(NetworkedPrinterOutputDevice):
file_handler: Optional[FileHandler] = None, **kwargs: str) -> None:
# Show an error message if we're already sending a job.
if self._sending_job:
if self._progress.visible:
self._onUploadError(T.BLOCKED_UPLOADING)
return
@ -286,53 +282,31 @@ class CloudOutputDevice(NetworkedPrinterOutputDevice):
# \param mesh: The bytes to upload.
# \param job_response: The response received from the cloud API.
def _onPrintJobCreated(self, mesh: bytes, job_response: CloudPrintJobResponse) -> None:
self._api.uploadMesh(job_response, mesh, self._onPrintJobUploaded, self._updateUploadProgress,
lambda _: self._onUploadError(T.UPLOAD_ERROR))
self._progress.show()
self._api.uploadMesh(job_response, mesh, lambda: self._onPrintJobUploaded(job_response.job_id),
self._progress.update, self._onUploadError)
## Requests the print to be sent to the printer when we finished uploading the mesh.
# \param job_id: The ID of the job.
def _onPrintJobUploaded(self, job_id: str) -> None:
self._api.requestPrint(self._device_id, job_id, self._onUploadSuccess)
## Updates the progress of the mesh upload.
# \param progress: The amount of percentage points uploaded until now (0-100).
def _updateUploadProgress(self, progress: int) -> None:
if not self._progress_message:
self._progress_message = Message(
text = T.SENDING_DATA_TEXT,
title = T.SENDING_DATA_TITLE,
progress = -1,
lifetime = 0,
dismissable = False,
use_inactivity_timer = False
)
self._progress_message.setProgress(progress)
self._progress_message.show()
## Hides the upload progress bar
def _resetUploadProgress(self) -> None:
if self._progress_message:
self._progress_message.hide()
self._progress_message = None
## Displays the given message if uploading the mesh has failed
# \param message: The message to display.
def _onUploadError(self, message: str = None) -> None:
self._resetUploadProgress()
if message:
Message(
text = message,
title = T.ERROR,
lifetime = 10
).show()
self._sending_job = False # the upload has finished so we're not sending a job anymore
def _onUploadError(self, message = None) -> None:
self._progress.hide()
Message(
text = message or T.UPLOAD_ERROR,
title = T.ERROR,
lifetime = 10
).show()
self.writeError.emit()
## Shows a message when the upload has succeeded
# \param response: The response from the cloud API.
def _onUploadSuccess(self, response: CloudPrintResponse) -> None:
Logger.log("i", "The cluster will be printing this print job with the ID %s", response.cluster_job_id)
self._resetUploadProgress()
self._progress.hide()
Message(
text = T.UPLOAD_SUCCESS_TEXT,
title = T.UPLOAD_SUCCESS_TITLE,

View File

@ -0,0 +1,37 @@
# Copyright (c) 2018 Ultimaker B.V.
# Cura is released under the terms of the LGPLv3 or higher.
from UM import i18nCatalog
from UM.Message import Message
## Class that contains all the translations for this module.
class T:
_I18N_CATALOG = i18nCatalog("cura")
SENDING_DATA_TEXT = _I18N_CATALOG.i18nc("@info:status", "Sending data to remote cluster")
SENDING_DATA_TITLE = _I18N_CATALOG.i18nc("@info:status", "Sending data to remote cluster")
class CloudProgressMessage(Message):
def __init__(self):
super().__init__(
text = T.SENDING_DATA_TEXT,
title = T.SENDING_DATA_TITLE,
progress = -1,
lifetime = 0,
dismissable = False,
use_inactivity_timer = False
)
def show(self):
self.setProgress(0)
super().show()
def update(self, percentage: int) -> None:
if not self._visible:
super().show()
self.setProgress(percentage)
@property
def visible(self) -> bool:
return self._visible

View File

@ -0,0 +1,94 @@
# Copyright (c) 2018 Ultimaker B.V.
# !/usr/bin/env python
# -*- coding: utf-8 -*-
from PyQt5.QtNetwork import QNetworkRequest, QNetworkReply
from typing import Optional, Callable, Any, Tuple
from UM.Logger import Logger
from cura.NetworkClient import NetworkClient
class ResumableUpload(NetworkClient):
MAX_RETRIES = 10
BYTES_PER_REQUEST = 256 * 1024
RETRY_HTTP_CODES = {500, 502, 503, 504}
## Creates a resumable upload
# \param url: The URL to which we shall upload.
# \param content_length: The total content length of the file, in bytes.
# \param http_method: The HTTP method to be used, e.g. "POST" or "PUT".
# \param timeout: The timeout for each chunk upload. Important: If None, no timeout is applied at all.
def __init__(self, url: str, content_type: str, data: bytes,
on_finished: Callable[[], Any], on_progress: Callable[[int], Any], on_error: Callable[[], Any]):
super().__init__()
self._url = url
self._content_type = content_type
self._data = data
self._on_finished = on_finished
self._on_progress = on_progress
self._on_error = on_error
self._sent_bytes = 0
self._retries = 0
self._finished = False
## We override _createEmptyRequest in order to add the user credentials.
# \param url: The URL to request
# \param content_type: The type of the body contents.
def _createEmptyRequest(self, path: str, content_type: Optional[str] = "application/json") -> QNetworkRequest:
request = super()._createEmptyRequest(path, content_type = self._content_type)
first_byte, last_byte = self._chunkRange()
content_range = "bytes {}-{}/{}".format(first_byte, last_byte - 1, len(self._data))
request.setRawHeader(b"Content-Range", content_range.encode())
Logger.log("i", "Uploading %s to %s", content_range, self._url)
return request
def _chunkRange(self) -> Tuple[int, int]:
last_byte = min(len(self._data), self._sent_bytes + self.BYTES_PER_REQUEST)
return self._sent_bytes, last_byte
def start(self) -> None:
self._uploadChunk()
def _uploadChunk(self) -> None:
if self._finished:
raise ValueError("The upload is already finished")
first_byte, last_byte = self._chunkRange()
Logger.log("i", "PUT %s - %s", first_byte, last_byte)
self.put(self._url, data = self._data[first_byte:last_byte], content_type = self._content_type,
on_finished = self.finishedCallback, on_progress = self.progressCallback)
def progressCallback(self, bytes_sent: int, bytes_total: int) -> None:
if bytes_total:
self._on_progress(int((self._sent_bytes + bytes_sent) / len(self._data) * 100))
def finishedCallback(self, reply: QNetworkReply) -> None:
status_code = reply.attribute(QNetworkRequest.HttpStatusCodeAttribute)
if self._retries < self.MAX_RETRIES and status_code in self.RETRY_HTTP_CODES:
self._retries += 1
Logger.log("i", "Retrying %s/%s request %s", tries, self.MAX_RETRIES, request.url)
self._uploadChunk()
return
body = bytes(reply.readAll()).decode()
Logger.log("w", "status_code: %s, Headers: %s, body: %s", status_code,
[bytes(header).decode() for header in reply.rawHeaderList()], body)
if status_code > 308:
self._finished = True
Logger.log("e", "Received error while uploading: %s", body)
self._on_error()
return
first_byte, last_byte = self._chunkRange()
self._sent_bytes += last_byte - first_byte
self._finished = self._sent_bytes >= len(self._data)
if self._finished:
self._on_finished()
else:
self._uploadChunk()

View File

@ -76,7 +76,8 @@ class NetworkManagerMock:
## Emits the signal that the reply is ready to all prepared replies.
def flushReplies(self) -> None:
for reply in self.replies.values():
for key, reply in self.replies.items():
Logger.log("i", "Flushing reply to {} {}", *key)
self.finished.emit(reply)
self.reset()

View File

@ -89,16 +89,17 @@ class TestCloudApiClient(TestCase):
data = parseFixture("putJobUploadResponse")["data"]
upload_response = CloudPrintJobResponse(**data)
self.network.prepareReply("PUT", upload_response.upload_url, 200,
b'{ data : "" }') # Network client doesn't look into the reply
# Network client doesn't look into the reply
self.network.prepareReply("PUT", upload_response.upload_url, 200, b'{}')
self.api.uploadMesh(upload_response, b'', lambda job_id: results.append(job_id),
progress.advance, progress.error)
mesh = ("1234" * 100000).encode()
self.api.uploadMesh(upload_response, mesh, lambda: results.append("sent"), progress.advance, progress.error)
self.network.flushReplies()
for _ in range(10):
self.network.flushReplies()
self.network.prepareReply("PUT", upload_response.upload_url, 200, b'{}')
self.assertEqual(len(results), 1)
self.assertEqual(results[0], upload_response.job_id)
self.assertEqual(["sent"], results)
def test_requestPrint(self, network_mock):
network_mock.return_value = self.network