mirror of
https://git.mirrors.martin98.com/https://github.com/Ultimaker/Cura
synced 2025-05-02 00:34:26 +08:00
119 lines
4.5 KiB
Python
119 lines
4.5 KiB
Python
# Copyright (c) 2018 Ultimaker B.V.
|
|
# Copyright (c) 2018 Ultimaker B.V.
|
|
# Cura is released under the terms of the LGPLv3 or higher.
|
|
import os
|
|
from typing import List
|
|
from unittest import TestCase
|
|
from unittest.mock import patch, MagicMock
|
|
|
|
from cura.CuraConstants import CuraCloudAPIRoot
|
|
from src.Cloud.CloudApiClient import CloudApiClient
|
|
from src.Cloud.Models.CloudClusterResponse import CloudClusterResponse
|
|
from src.Cloud.Models.CloudClusterStatus import CloudClusterStatus
|
|
from src.Cloud.Models.CloudPrintJobResponse import CloudPrintJobResponse
|
|
from src.Cloud.Models.CloudPrintJobUploadRequest import CloudPrintJobUploadRequest
|
|
from src.Cloud.Models.CloudError import CloudError
|
|
from tests.Cloud.Fixtures import readFixture, parseFixture
|
|
from .NetworkManagerMock import NetworkManagerMock
|
|
|
|
|
|
class TestCloudApiClient(TestCase):
|
|
maxDiff = None
|
|
|
|
def _errorHandler(self, errors: List[CloudError]):
|
|
raise Exception("Received unexpected error: {}".format(errors))
|
|
|
|
def setUp(self):
|
|
super().setUp()
|
|
self.account = MagicMock()
|
|
self.account.isLoggedIn.return_value = True
|
|
|
|
self.network = NetworkManagerMock()
|
|
with patch("src.Cloud.CloudApiClient.QNetworkAccessManager", return_value = self.network):
|
|
self.api = CloudApiClient(self.account, self._errorHandler)
|
|
|
|
def test_getClusters(self):
|
|
result = []
|
|
|
|
response = readFixture("getClusters")
|
|
data = parseFixture("getClusters")["data"]
|
|
|
|
self.network.prepareReply("GET", CuraCloudAPIRoot + "/connect/v1/clusters", 200, response)
|
|
# the callback is a function that adds the result of the call to getClusters to the result list
|
|
self.api.getClusters(lambda clusters: result.extend(clusters))
|
|
|
|
self.network.flushReplies()
|
|
|
|
self.assertEqual([CloudClusterResponse(**data[0]), CloudClusterResponse(**data[1])], result)
|
|
|
|
def test_getClusterStatus(self):
|
|
result = []
|
|
|
|
response = readFixture("getClusterStatusResponse")
|
|
data = parseFixture("getClusterStatusResponse")["data"]
|
|
|
|
url = CuraCloudAPIRoot + "/connect/v1/clusters/R0YcLJwar1ugh0ikEZsZs8NWKV6vJP_LdYsXgXqAcaNC/status"
|
|
self.network.prepareReply("GET", url, 200, response)
|
|
self.api.getClusterStatus("R0YcLJwar1ugh0ikEZsZs8NWKV6vJP_LdYsXgXqAcaNC", lambda s: result.append(s))
|
|
|
|
self.network.flushReplies()
|
|
|
|
self.assertEqual([CloudClusterStatus(**data)], result)
|
|
|
|
def test_requestUpload(self):
|
|
|
|
results = []
|
|
|
|
response = readFixture("putJobUploadResponse")
|
|
|
|
self.network.prepareReply("PUT", CuraCloudAPIRoot + "/cura/v1/jobs/upload", 200, response)
|
|
request = CloudPrintJobUploadRequest(job_name = "job name", file_size = 143234, content_type = "text/plain")
|
|
self.api.requestUpload(request, lambda r: results.append(r))
|
|
self.network.flushReplies()
|
|
|
|
self.assertEqual(["text/plain"], [r.content_type for r in results])
|
|
self.assertEqual(["uploading"], [r.status for r in results])
|
|
|
|
def test_uploadMesh(self):
|
|
|
|
results = []
|
|
progress = MagicMock()
|
|
|
|
data = parseFixture("putJobUploadResponse")["data"]
|
|
upload_response = CloudPrintJobResponse(**data)
|
|
|
|
# Network client doesn't look into the reply
|
|
self.network.prepareReply("PUT", upload_response.upload_url, 200, b'{}')
|
|
|
|
mesh = ("1234" * 100000).encode()
|
|
self.api.uploadMesh(upload_response, mesh, lambda: results.append("sent"), progress.advance, progress.error)
|
|
|
|
for _ in range(10):
|
|
self.network.flushReplies()
|
|
self.network.prepareReply("PUT", upload_response.upload_url, 200, b'{}')
|
|
|
|
self.assertEqual(["sent"], results)
|
|
|
|
def test_requestPrint(self):
|
|
|
|
results = []
|
|
|
|
response = readFixture("postJobPrintResponse")
|
|
|
|
cluster_id = "NWKV6vJP_LdYsXgXqAcaNCR0YcLJwar1ugh0ikEZsZs8"
|
|
cluster_job_id = "9a59d8e9-91d3-4ff6-b4cb-9db91c4094dd"
|
|
job_id = "ABCDefGHIjKlMNOpQrSTUvYxWZ0-1234567890abcDE="
|
|
|
|
self.network.prepareReply("POST",
|
|
CuraCloudAPIRoot + "/connect/v1/clusters/{}/print/{}"
|
|
.format(cluster_id, job_id),
|
|
200, response)
|
|
|
|
self.api.requestPrint(cluster_id, job_id, lambda r: results.append(r))
|
|
|
|
self.network.flushReplies()
|
|
|
|
self.assertEqual([job_id], [r.job_id for r in results])
|
|
self.assertEqual([cluster_job_id], [r.cluster_job_id for r in results])
|
|
self.assertEqual(["queued"], [r.status for r in results])
|