Cura/plugins/UM3NetworkPrinting/tests/Cloud/TestCloudApiClient.py

120 lines
4.7 KiB
Python

# Copyright (c) 2018 Ultimaker B.V.
# Copyright (c) 2018 Ultimaker B.V.
# Cura is released under the terms of the LGPLv3 or higher.
import os
from typing import List
from unittest import TestCase
from unittest.mock import patch, MagicMock
from cura.CuraApplication import CuraApplication
from src.Cloud.CloudApiClient import CloudApiClient
from src.Cloud.Models.CloudClusterResponse import CloudClusterResponse
from src.Cloud.Models.CloudClusterStatus import CloudClusterStatus
from src.Cloud.Models.CloudPrintJobResponse import CloudPrintJobResponse
from src.Cloud.Models.CloudPrintJobUploadRequest import CloudPrintJobUploadRequest
from src.Cloud.Models.CloudErrorObject import CloudErrorObject
from tests.Cloud.Fixtures import readFixture, parseFixture
from .NetworkManagerMock import NetworkManagerMock
class TestCloudApiClient(TestCase):
def _errorHandler(self, errors: List[CloudErrorObject]):
raise Exception("Received unexpected error: {}".format(errors))
def setUp(self):
super().setUp()
self.account = MagicMock()
self.account.isLoggedIn.return_value = True
self.app = CuraApplication.getInstance()
self.network = NetworkManagerMock()
with patch("src.Cloud.CloudApiClient.QNetworkAccessManager", return_value = self.network):
self.api = CloudApiClient(self.account, self._errorHandler)
def test_getClusters(self):
result = []
response = readFixture("getClusters")
data = parseFixture("getClusters")["data"]
self.network.prepareReply("GET", "https://api-staging.ultimaker.com/connect/v1/clusters", 200, response)
# the callback is a function that adds the result of the call to getClusters to the result list
self.api.getClusters(lambda clusters: result.extend(clusters))
self.network.flushReplies()
self.assertEqual([CloudClusterResponse(**data[0]), CloudClusterResponse(**data[1])], result)
def test_getClusterStatus(self):
result = []
response = readFixture("getClusterStatusResponse")
data = parseFixture("getClusterStatusResponse")["data"]
self.network.prepareReply("GET",
"https://api-staging.ultimaker.com/connect/v1/clusters/R0YcLJwar1ugh0ikEZsZs8NWKV6vJP_LdYsXgXqAcaNC/status",
200, response
)
self.api.getClusterStatus("R0YcLJwar1ugh0ikEZsZs8NWKV6vJP_LdYsXgXqAcaNC", lambda s: result.append(s))
self.network.flushReplies()
self.assertEqual([CloudClusterStatus(**data)], result)
def test_requestUpload(self):
results = []
response = readFixture("putJobUploadResponse")
self.network.prepareReply("PUT", "https://api-staging.ultimaker.com/cura/v1/jobs/upload", 200, response)
request = CloudPrintJobUploadRequest(job_name = "job name", file_size = 143234, content_type = "text/plain")
self.api.requestUpload(request, lambda r: results.append(r))
self.network.flushReplies()
self.assertEqual(["text/plain"], [r.content_type for r in results])
self.assertEqual(["uploading"], [r.status for r in results])
def test_uploadMesh(self):
results = []
progress = MagicMock()
data = parseFixture("putJobUploadResponse")["data"]
upload_response = CloudPrintJobResponse(**data)
# Network client doesn't look into the reply
self.network.prepareReply("PUT", upload_response.upload_url, 200, b'{}')
mesh = ("1234" * 100000).encode()
self.api.uploadMesh(upload_response, mesh, lambda: results.append("sent"), progress.advance, progress.error)
for _ in range(10):
self.network.flushReplies()
self.network.prepareReply("PUT", upload_response.upload_url, 200, b'{}')
self.assertEqual(["sent"], results)
def test_requestPrint(self):
results = []
response = readFixture("postJobPrintResponse")
cluster_id = "NWKV6vJP_LdYsXgXqAcaNCR0YcLJwar1ugh0ikEZsZs8"
cluster_job_id = "9a59d8e9-91d3-4ff6-b4cb-9db91c4094dd"
job_id = "ABCDefGHIjKlMNOpQrSTUvYxWZ0-1234567890abcDE="
self.network.prepareReply("POST",
"https://api-staging.ultimaker.com/connect/v1/clusters/{}/print/{}"
.format(cluster_id, job_id),
200, response)
self.api.requestPrint(cluster_id, job_id, lambda r: results.append(r))
self.network.flushReplies()
self.assertEqual([job_id], [r.job_id for r in results])
self.assertEqual([cluster_job_id], [r.cluster_job_id for r in results])
self.assertEqual(["queued"], [r.status for r in results])