# Copyright (c) 2018 Ultimaker B.V. # Cura is released under the terms of the LGPLv3 or higher. import json from unittest import TestCase from unittest.mock import patch, MagicMock from UM.Scene.SceneNode import SceneNode from UM.Signal import Signal from cura.CuraApplication import CuraApplication from cura.PrinterOutput.PrinterOutputModel import PrinterOutputModel from src.Cloud.CloudApiClient import CloudApiClient from src.Cloud.CloudOutputDevice import CloudOutputDevice from src.Cloud.Models.CloudClusterResponse import CloudClusterResponse from tests.Cloud.Fixtures import readFixture, parseFixture from .NetworkManagerMock import NetworkManagerMock class TestCloudOutputDevice(TestCase): maxDiff = None CLUSTER_ID = "RIZ6cZbWA_Ua7RZVJhrdVfVpf0z-MqaSHQE4v8aRTtYq" JOB_ID = "ABCDefGHIjKlMNOpQrSTUvYxWZ0-1234567890abcDE=" HOST_NAME = "ultimakersystem-ccbdd30044ec" HOST_GUID = "e90ae0ac-1257-4403-91ee-a44c9b7e8050" BASE_URL = "https://api-staging.ultimaker.com" STATUS_URL = "{}/connect/v1/clusters/{}/status".format(BASE_URL, CLUSTER_ID) PRINT_URL = "{}/connect/v1/clusters/{}/print/{}".format(BASE_URL, CLUSTER_ID, JOB_ID) REQUEST_UPLOAD_URL = "{}/cura/v1/jobs/upload".format(BASE_URL) def setUp(self): super().setUp() self.app = CuraApplication.getInstance() self.backend = MagicMock(backendStateChange = Signal()) self.app.setBackend(self.backend) self.cluster = CloudClusterResponse(self.CLUSTER_ID, self.HOST_GUID, self.HOST_NAME, is_online=True, status="active") self.network = NetworkManagerMock() self.account = MagicMock(isLoggedIn=True, accessToken="TestAccessToken") self.onError = MagicMock() with patch("src.Cloud.CloudApiClient.QNetworkAccessManager", return_value = self.network): self._api = CloudApiClient(self.account, self.onError) self.device = CloudOutputDevice(self._api, self.cluster) self.cluster_status = parseFixture("getClusterStatusResponse") self.network.prepareReply("GET", self.STATUS_URL, 200, readFixture("getClusterStatusResponse")) def tearDown(self): super().tearDown() self.network.flushReplies() def test_status(self): self.device._update() self.network.flushReplies() self.assertEqual([PrinterOutputModel, PrinterOutputModel], [type(printer) for printer in self.device.printers]) controller_fields = { "_output_device": self.device, "can_abort": False, "can_control_manually": False, "can_pause": False, "can_pre_heat_bed": False, "can_pre_heat_hotends": False, "can_send_raw_gcode": False, "can_update_firmware": False, } self.assertEqual({printer["uuid"] for printer in self.cluster_status["data"]["printers"]}, {printer.key for printer in self.device.printers}) self.assertEqual([controller_fields, controller_fields], [printer.getController().__dict__ for printer in self.device.printers]) self.assertEqual(["UM3PrintJobOutputModel"], [type(printer).__name__ for printer in self.device.printJobs]) self.assertEqual({job["uuid"] for job in self.cluster_status["data"]["print_jobs"]}, {job.key for job in self.device.printJobs}) self.assertEqual({job["owner"] for job in self.cluster_status["data"]["print_jobs"]}, {job.owner for job in self.device.printJobs}) self.assertEqual({job["name"] for job in self.cluster_status["data"]["print_jobs"]}, {job.name for job in self.device.printJobs}) def test_remove_print_job(self): self.device._update() self.network.flushReplies() self.assertEqual(1, len(self.device.printJobs)) self.cluster_status["data"]["print_jobs"].clear() self.network.prepareReply("GET", self.STATUS_URL, 200, self.cluster_status) self.device._last_request_time = None self.device._update() self.network.flushReplies() self.assertEqual([], self.device.printJobs) def test_remove_printers(self): self.device._update() self.network.flushReplies() self.assertEqual(2, len(self.device.printers)) self.cluster_status["data"]["printers"].clear() self.network.prepareReply("GET", self.STATUS_URL, 200, self.cluster_status) self.device._last_request_time = None self.device._update() self.network.flushReplies() self.assertEqual([], self.device.printers) @patch("cura.CuraApplication.CuraApplication.getGlobalContainerStack") def test_print_to_cloud(self, global_container_stack_mock): active_machine_mock = global_container_stack_mock.return_value active_machine_mock.getMetaDataEntry.side_effect = {"file_formats": "application/gzip"}.get request_upload_response = parseFixture("putJobUploadResponse") request_print_response = parseFixture("postJobPrintResponse") self.network.prepareReply("PUT", self.REQUEST_UPLOAD_URL, 201, request_upload_response) self.network.prepareReply("PUT", request_upload_response["data"]["upload_url"], 201, b"{}") self.network.prepareReply("POST", self.PRINT_URL, 200, request_print_response) file_handler = MagicMock() file_handler.getSupportedFileTypesWrite.return_value = [{ "extension": "gcode.gz", "mime_type": "application/gzip", "mode": 2, }] file_handler.getWriterByMimeType.return_value.write.side_effect = \ lambda stream, nodes: stream.write(str(nodes).encode()) scene_nodes = [SceneNode()] expected_mesh = str(scene_nodes).encode() self.device.requestWrite(scene_nodes, file_handler=file_handler, file_name="FileName") self.network.flushReplies() self.assertEqual( {"data": {"content_type": "application/gzip", "file_size": len(expected_mesh), "job_name": "FileName"}}, json.loads(self.network.getRequestBody("PUT", self.REQUEST_UPLOAD_URL).decode()) ) self.assertEqual(expected_mesh, self.network.getRequestBody("PUT", request_upload_response["data"]["upload_url"])) self.assertIsNone(self.network.getRequestBody("POST", self.PRINT_URL))