mirror of
https://git.mirrors.martin98.com/https://github.com/Ultimaker/Cura
synced 2025-05-29 17:45:45 +08:00
Merge pull request #4032 from Ultimaker/fix_tests_cura_engine_backend
Fix MyPy issues
This commit is contained in:
commit
c36d089113
1
Jenkinsfile
vendored
1
Jenkinsfile
vendored
@ -1,5 +1,6 @@
|
||||
parallel_nodes(['linux && cura', 'windows && cura']) {
|
||||
timeout(time: 2, unit: "HOURS") {
|
||||
|
||||
// Prepare building
|
||||
stage('Prepare') {
|
||||
// Ensure we start with a clean build directory.
|
||||
|
@ -21,6 +21,7 @@ from UM.Scene.Iterator.DepthFirstIterator import DepthFirstIterator
|
||||
from UM.Settings.Interfaces import DefinitionContainerInterface
|
||||
from UM.Settings.SettingInstance import SettingInstance #For typing.
|
||||
from UM.Tool import Tool #For typing.
|
||||
from UM.Mesh.MeshData import MeshData #For typing.
|
||||
|
||||
from cura.CuraApplication import CuraApplication
|
||||
from cura.Settings.ExtruderManager import ExtruderManager
|
||||
@ -62,15 +63,15 @@ class CuraEngineBackend(QObject, Backend):
|
||||
if Platform.isLinux() and not default_engine_location:
|
||||
if not os.getenv("PATH"):
|
||||
raise OSError("There is something wrong with your Linux installation.")
|
||||
for pathdir in os.getenv("PATH").split(os.pathsep):
|
||||
for pathdir in cast(str, os.getenv("PATH")).split(os.pathsep):
|
||||
execpath = os.path.join(pathdir, executable_name)
|
||||
if os.path.exists(execpath):
|
||||
default_engine_location = execpath
|
||||
break
|
||||
|
||||
self._application = CuraApplication.getInstance() #type: CuraApplication
|
||||
self._multi_build_plate_model = None #type: MultiBuildPlateModel
|
||||
self._machine_error_checker = None #type: MachineErrorChecker
|
||||
self._multi_build_plate_model = None #type: Optional[MultiBuildPlateModel]
|
||||
self._machine_error_checker = None #type: Optional[MachineErrorChecker]
|
||||
|
||||
if not default_engine_location:
|
||||
raise EnvironmentError("Could not find CuraEngine")
|
||||
@ -120,7 +121,7 @@ class CuraEngineBackend(QObject, Backend):
|
||||
self._engine_is_fresh = True #type: bool # Is the newly started engine used before or not?
|
||||
|
||||
self._backend_log_max_lines = 20000 #type: int # Maximum number of lines to buffer
|
||||
self._error_message = None #type: Message # Pop-up message that shows errors.
|
||||
self._error_message = None #type: Optional[Message] # Pop-up message that shows errors.
|
||||
self._last_num_objects = defaultdict(int) #type: Dict[int, int] # Count number of objects to see if there is something changed
|
||||
self._postponed_scene_change_sources = [] #type: List[SceneNode] # scene change is postponed (by a tool)
|
||||
|
||||
@ -145,7 +146,9 @@ class CuraEngineBackend(QObject, Backend):
|
||||
self._multi_build_plate_model = self._application.getMultiBuildPlateModel()
|
||||
|
||||
self._application.getController().activeViewChanged.connect(self._onActiveViewChanged)
|
||||
self._multi_build_plate_model.activeBuildPlateChanged.connect(self._onActiveViewChanged)
|
||||
|
||||
if self._multi_build_plate_model:
|
||||
self._multi_build_plate_model.activeBuildPlateChanged.connect(self._onActiveViewChanged)
|
||||
|
||||
self._application.globalContainerStackChanged.connect(self._onGlobalStackChanged)
|
||||
self._onGlobalStackChanged()
|
||||
@ -246,7 +249,7 @@ class CuraEngineBackend(QObject, Backend):
|
||||
if self._application.getPrintInformation() and build_plate_to_be_sliced == active_build_plate:
|
||||
self._application.getPrintInformation().setToZeroPrintInformation(build_plate_to_be_sliced)
|
||||
|
||||
if self._process is None:
|
||||
if self._process is None: # type: ignore
|
||||
self._createSocket()
|
||||
self.stopSlicing()
|
||||
self._engine_is_fresh = False # Yes we're going to use the engine
|
||||
@ -284,12 +287,12 @@ class CuraEngineBackend(QObject, Backend):
|
||||
if self._application.getUseExternalBackend():
|
||||
return
|
||||
|
||||
if self._process is not None:
|
||||
if self._process is not None: # type: ignore
|
||||
Logger.log("d", "Killing engine process")
|
||||
try:
|
||||
self._process.terminate()
|
||||
Logger.log("d", "Engine process is killed. Received return code %s", self._process.wait())
|
||||
self._process = None
|
||||
self._process.terminate() # type: ignore
|
||||
Logger.log("d", "Engine process is killed. Received return code %s", self._process.wait()) # type: ignore
|
||||
self._process = None # type: ignore
|
||||
|
||||
except Exception as e: # terminating a process that is already terminating causes an exception, silently ignore this.
|
||||
Logger.log("d", "Exception occurred while trying to kill the engine %s", str(e))
|
||||
@ -328,6 +331,9 @@ class CuraEngineBackend(QObject, Backend):
|
||||
|
||||
if job.getResult() == StartJobResult.SettingError:
|
||||
if self._application.platformActivity:
|
||||
if not self._global_container_stack:
|
||||
Logger.log("w", "Global container stack not assigned to CuraEngineBackend!")
|
||||
return
|
||||
extruders = list(ExtruderManager.getInstance().getMachineExtruders(self._global_container_stack.getId()))
|
||||
error_keys = [] #type: List[str]
|
||||
for extruder in extruders:
|
||||
@ -361,6 +367,9 @@ class CuraEngineBackend(QObject, Backend):
|
||||
if not stack:
|
||||
continue
|
||||
for key in stack.getErrorKeys():
|
||||
if not self._global_container_stack:
|
||||
Logger.log("e", "CuraEngineBackend does not have global_container_stack assigned.")
|
||||
continue
|
||||
definition = cast(DefinitionContainerInterface, self._global_container_stack.getBottom()).findDefinitions(key = key)
|
||||
if not definition:
|
||||
Logger.log("e", "When checking settings for errors, unable to find definition for key {key} in per-object stack.".format(key = key))
|
||||
@ -409,7 +418,8 @@ class CuraEngineBackend(QObject, Backend):
|
||||
# Notify the user that it's now up to the backend to do it's job
|
||||
self.backendStateChange.emit(BackendState.Processing)
|
||||
|
||||
Logger.log("d", "Sending slice message took %s seconds", time() - self._slice_start_time )
|
||||
if self._slice_start_time:
|
||||
Logger.log("d", "Sending slice message took %s seconds", time() - self._slice_start_time )
|
||||
|
||||
## Determine enable or disable auto slicing. Return True for enable timer and False otherwise.
|
||||
# It disables when
|
||||
@ -476,15 +486,12 @@ class CuraEngineBackend(QObject, Backend):
|
||||
else:
|
||||
# we got a single scenenode
|
||||
if not source.callDecoration("isGroup"):
|
||||
if source.getMeshData() is None:
|
||||
return
|
||||
if source.getMeshData().getVertices() is None:
|
||||
mesh_data = source.getMeshData()
|
||||
if mesh_data and mesh_data.getVertices() is None:
|
||||
return
|
||||
|
||||
build_plate_changed.add(source_build_plate_number)
|
||||
|
||||
build_plate_changed.discard(None)
|
||||
build_plate_changed.discard(-1) # object not on build plate
|
||||
if not build_plate_changed:
|
||||
return
|
||||
|
||||
@ -577,9 +584,10 @@ class CuraEngineBackend(QObject, Backend):
|
||||
#
|
||||
# \param message The protobuf message containing sliced layer data.
|
||||
def _onOptimizedLayerMessage(self, message: Arcus.PythonMessage) -> None:
|
||||
if self._start_slice_job_build_plate not in self._stored_optimized_layer_data:
|
||||
self._stored_optimized_layer_data[self._start_slice_job_build_plate] = []
|
||||
self._stored_optimized_layer_data[self._start_slice_job_build_plate].append(message)
|
||||
if self._start_slice_job_build_plate:
|
||||
if self._start_slice_job_build_plate not in self._stored_optimized_layer_data:
|
||||
self._stored_optimized_layer_data[self._start_slice_job_build_plate] = []
|
||||
self._stored_optimized_layer_data[self._start_slice_job_build_plate].append(message)
|
||||
|
||||
## Called when a progress message is received from the engine.
|
||||
#
|
||||
@ -619,7 +627,8 @@ class CuraEngineBackend(QObject, Backend):
|
||||
gcode_list[index] = replaced
|
||||
|
||||
self._slicing = False
|
||||
Logger.log("d", "Slicing took %s seconds", time() - self._slice_start_time )
|
||||
if self._slice_start_time:
|
||||
Logger.log("d", "Slicing took %s seconds", time() - self._slice_start_time )
|
||||
Logger.log("d", "Number of models per buildplate: %s", dict(self._numObjectsPerBuildPlate()))
|
||||
|
||||
# See if we need to process the sliced layers job.
|
||||
@ -658,7 +667,11 @@ class CuraEngineBackend(QObject, Backend):
|
||||
## Creates a new socket connection.
|
||||
def _createSocket(self, protocol_file: str = None) -> None:
|
||||
if not protocol_file:
|
||||
protocol_file = os.path.abspath(os.path.join(PluginRegistry.getInstance().getPluginPath(self.getPluginId()), "Cura.proto"))
|
||||
plugin_path = PluginRegistry.getInstance().getPluginPath(self.getPluginId())
|
||||
if not plugin_path:
|
||||
Logger.log("e", "Could not get plugin path!", self.getPluginId())
|
||||
return
|
||||
protocol_file = os.path.abspath(os.path.join(plugin_path, "Cura.proto"))
|
||||
super()._createSocket(protocol_file)
|
||||
self._engine_is_fresh = True
|
||||
|
||||
@ -773,9 +786,9 @@ class CuraEngineBackend(QObject, Backend):
|
||||
# We should reset our state and start listening for new connections.
|
||||
def _onBackendQuit(self) -> None:
|
||||
if not self._restart:
|
||||
if self._process:
|
||||
Logger.log("d", "Backend quit with return code %s. Resetting process and socket.", self._process.wait())
|
||||
self._process = None
|
||||
if self._process: # type: ignore
|
||||
Logger.log("d", "Backend quit with return code %s. Resetting process and socket.", self._process.wait()) # type: ignore
|
||||
self._process = None # type: ignore
|
||||
|
||||
## Called when the global container stack changes
|
||||
def _onGlobalStackChanged(self) -> None:
|
||||
@ -831,6 +844,9 @@ class CuraEngineBackend(QObject, Backend):
|
||||
self._change_timer.start()
|
||||
|
||||
def _extruderChanged(self) -> None:
|
||||
if not self._multi_build_plate_model:
|
||||
Logger.log("w", "CuraEngineBackend does not have multi_build_plate_model assigned!")
|
||||
return
|
||||
for build_plate_number in range(self._multi_build_plate_model.maxBuildPlate + 1):
|
||||
if build_plate_number not in self._build_plates_to_be_sliced:
|
||||
self._build_plates_to_be_sliced.append(build_plate_number)
|
||||
|
@ -5,7 +5,7 @@ import numpy
|
||||
from string import Formatter
|
||||
from enum import IntEnum
|
||||
import time
|
||||
from typing import Any, Dict, List, Optional, Set
|
||||
from typing import Any, cast, Dict, List, Optional, Set
|
||||
import re
|
||||
import Arcus #For typing.
|
||||
|
||||
@ -209,12 +209,15 @@ class StartSliceJob(Job):
|
||||
if temp_list:
|
||||
object_groups.append(temp_list)
|
||||
|
||||
extruders_enabled = {position: stack.isEnabled for position, stack in CuraApplication.getInstance().getGlobalContainerStack().extruders.items()}
|
||||
global_stack = CuraApplication.getInstance().getGlobalContainerStack()
|
||||
if not global_stack:
|
||||
return
|
||||
extruders_enabled = {position: stack.isEnabled for position, stack in global_stack.extruders.items()}
|
||||
filtered_object_groups = []
|
||||
has_model_with_disabled_extruders = False
|
||||
associated_disabled_extruders = set()
|
||||
for group in object_groups:
|
||||
stack = CuraApplication.getInstance().getGlobalContainerStack()
|
||||
stack = global_stack
|
||||
skip_group = False
|
||||
for node in group:
|
||||
extruder_position = node.callDecoration("getActiveExtruderPosition")
|
||||
@ -318,7 +321,7 @@ class StartSliceJob(Job):
|
||||
# \param default_extruder_nr Stack nr to use when no stack nr is specified, defaults to the global stack
|
||||
def _expandGcodeTokens(self, value: str, default_extruder_nr: int = -1) -> str:
|
||||
if not self._all_extruders_settings:
|
||||
global_stack = CuraApplication.getInstance().getGlobalContainerStack()
|
||||
global_stack = cast(ContainerStack, CuraApplication.getInstance().getGlobalContainerStack())
|
||||
|
||||
# NB: keys must be strings for the string formatter
|
||||
self._all_extruders_settings = {
|
||||
|
@ -286,6 +286,7 @@ class FlavorParser:
|
||||
self._cancelled = False
|
||||
# We obtain the filament diameter from the selected extruder to calculate line widths
|
||||
global_stack = CuraApplication.getInstance().getGlobalContainerStack()
|
||||
|
||||
if not global_stack:
|
||||
return None
|
||||
|
||||
|
@ -11,7 +11,7 @@ from UM.i18n import i18nCatalog
|
||||
catalog = i18nCatalog("cura")
|
||||
|
||||
# Ignore windows error popups. Fixes the whole "Can't open drive X" when user has an SD card reader.
|
||||
ctypes.windll.kernel32.SetErrorMode(1)
|
||||
ctypes.windll.kernel32.SetErrorMode(1) #type: ignore
|
||||
|
||||
# WinAPI Constants that we need
|
||||
# Hardcoded here due to stupid WinDLL stuff that does not give us access to these values.
|
||||
@ -29,7 +29,7 @@ OPEN_EXISTING = 3 # [CodeStyle: Windows Enum value]
|
||||
|
||||
# Setup the DeviceIoControl function arguments and return type.
|
||||
# See ctypes documentation for details on how to call C functions from python, and why this is important.
|
||||
ctypes.windll.kernel32.DeviceIoControl.argtypes = [
|
||||
ctypes.windll.kernel32.DeviceIoControl.argtypes = [ #type: ignore
|
||||
wintypes.HANDLE, # _In_ HANDLE hDevice
|
||||
wintypes.DWORD, # _In_ DWORD dwIoControlCode
|
||||
wintypes.LPVOID, # _In_opt_ LPVOID lpInBuffer
|
||||
@ -39,7 +39,7 @@ ctypes.windll.kernel32.DeviceIoControl.argtypes = [
|
||||
ctypes.POINTER(wintypes.DWORD), # _Out_opt_ LPDWORD lpBytesReturned
|
||||
wintypes.LPVOID # _Inout_opt_ LPOVERLAPPED lpOverlapped
|
||||
]
|
||||
ctypes.windll.kernel32.DeviceIoControl.restype = wintypes.BOOL
|
||||
ctypes.windll.kernel32.DeviceIoControl.restype = wintypes.BOOL #type: ignore
|
||||
|
||||
|
||||
## Removable drive support for windows
|
||||
|
@ -16,7 +16,7 @@ from UM.i18n import i18nCatalog
|
||||
from UM.Logger import Logger
|
||||
from UM.PluginRegistry import PluginRegistry
|
||||
from UM.Qt.Duration import DurationFormat
|
||||
|
||||
from typing import cast, Optional
|
||||
from .SliceInfoJob import SliceInfoJob
|
||||
|
||||
|
||||
@ -79,11 +79,16 @@ class SliceInfo(QObject, Extension):
|
||||
return dialog
|
||||
|
||||
@pyqtSlot(result = str)
|
||||
def getExampleData(self) -> str:
|
||||
def getExampleData(self) -> Optional[str]:
|
||||
if self._example_data_content is None:
|
||||
file_path = os.path.join(PluginRegistry.getInstance().getPluginPath(self.getPluginId()), "example_data.json")
|
||||
with open(file_path, "r", encoding = "utf-8") as f:
|
||||
self._example_data_content = f.read()
|
||||
plugin_path = PluginRegistry.getInstance().getPluginPath(self.getPluginId())
|
||||
if not plugin_path:
|
||||
Logger.log("e", "Could not get plugin path!", self.getPluginId())
|
||||
return None
|
||||
file_path = os.path.join(plugin_path, "example_data.json")
|
||||
if file_path:
|
||||
with open(file_path, "r", encoding = "utf-8") as f:
|
||||
self._example_data_content = f.read()
|
||||
return self._example_data_content
|
||||
|
||||
@pyqtSlot(bool)
|
||||
|
@ -6,7 +6,7 @@ import json
|
||||
import os
|
||||
import tempfile
|
||||
import platform
|
||||
from typing import List
|
||||
from typing import cast, List
|
||||
|
||||
from PyQt5.QtCore import QUrl, QObject, pyqtProperty, pyqtSignal, pyqtSlot
|
||||
from PyQt5.QtNetwork import QNetworkAccessManager, QNetworkRequest, QNetworkReply
|
||||
@ -240,7 +240,10 @@ class Toolbox(QObject, Extension):
|
||||
if not plugin_path:
|
||||
return None
|
||||
path = os.path.join(plugin_path, "resources", "qml", qml_name)
|
||||
|
||||
dialog = self._application.createQmlComponent(path, {"toolbox": self})
|
||||
if not dialog:
|
||||
raise Exception("Failed to create toolbox dialog")
|
||||
return dialog
|
||||
|
||||
|
||||
|
@ -1,7 +1,7 @@
|
||||
# Copyright (c) 2018 Ultimaker B.V.
|
||||
# Cura is released under the terms of the LGPLv3 or higher.
|
||||
|
||||
from typing import Any, cast, Set, Tuple, Union
|
||||
from typing import Any, cast, Optional, Set, Tuple, Union
|
||||
|
||||
from UM.FileHandler.FileHandler import FileHandler
|
||||
from UM.FileHandler.FileWriter import FileWriter #To choose based on the output file mode (text vs. binary).
|
||||
@ -9,6 +9,7 @@ from UM.FileHandler.WriteFileJob import WriteFileJob #To call the file writer as
|
||||
from UM.Logger import Logger
|
||||
from UM.Settings.ContainerRegistry import ContainerRegistry
|
||||
from UM.i18n import i18nCatalog
|
||||
from UM.Mesh.MeshWriter import MeshWriter # For typing
|
||||
from UM.Message import Message
|
||||
from UM.Qt.Duration import Duration, DurationFormat
|
||||
from UM.OutputDevice import OutputDeviceError #To show that something went wrong when writing.
|
||||
@ -104,10 +105,11 @@ class ClusterUM3OutputDevice(NetworkedPrinterOutputDevice):
|
||||
file_formats = CuraApplication.getInstance().getMeshFileHandler().getSupportedFileTypesWrite()
|
||||
|
||||
global_stack = CuraApplication.getInstance().getGlobalContainerStack()
|
||||
#Create a list from the supported file formats string.
|
||||
if not global_stack:
|
||||
Logger.log("e", "Missing global stack!")
|
||||
return
|
||||
|
||||
#Create a list from the supported file formats string.
|
||||
machine_file_formats = global_stack.getMetaDataEntry("file_formats").split(";")
|
||||
machine_file_formats = [file_type.strip() for file_type in machine_file_formats]
|
||||
#Exception for UM3 firmware version >=4.4: UFP is now supported and should be the preferred file format.
|
||||
@ -134,6 +136,9 @@ class ClusterUM3OutputDevice(NetworkedPrinterOutputDevice):
|
||||
return
|
||||
|
||||
#This function pauses with the yield, waiting on instructions on which printer it needs to print with.
|
||||
if not writer:
|
||||
Logger.log("e", "Missing file or mesh writer!")
|
||||
return
|
||||
self._sending_job = self._sendPrintJob(writer, preferred_format, nodes)
|
||||
self._sending_job.send(None) #Start the generator.
|
||||
|
||||
@ -213,16 +218,14 @@ class ClusterUM3OutputDevice(NetworkedPrinterOutputDevice):
|
||||
yield #To prevent having to catch the StopIteration exception.
|
||||
|
||||
def _sendPrintJobWaitOnWriteJobFinished(self, job: WriteFileJob) -> None:
|
||||
# This is the callback when the job finishes, where the message is created
|
||||
assert(self._write_job_progress_message is not None)
|
||||
self._write_job_progress_message.hide()
|
||||
if self._write_job_progress_message:
|
||||
self._write_job_progress_message.hide()
|
||||
|
||||
self._progress_message = Message(i18n_catalog.i18nc("@info:status", "Sending data to printer"), lifetime = 0, dismissable = False, progress = -1,
|
||||
title = i18n_catalog.i18nc("@info:title", "Sending Data"))
|
||||
self._progress_message.addAction("Abort", i18n_catalog.i18nc("@action:button", "Cancel"), icon = None, description = "")
|
||||
self._progress_message.actionTriggered.connect(self._progressMessageActionTriggered)
|
||||
self._progress_message.show()
|
||||
|
||||
parts = []
|
||||
|
||||
target_printer, preferred_format, stream = self._dummy_lambdas
|
||||
@ -259,7 +262,7 @@ class ClusterUM3OutputDevice(NetworkedPrinterOutputDevice):
|
||||
self.activePrinterChanged.emit()
|
||||
|
||||
def _onPostPrintJobFinished(self, reply: QNetworkReply) -> None:
|
||||
if self._progress_message is not None:
|
||||
if self._progress_message:
|
||||
self._progress_message.hide()
|
||||
self._compressing_gcode = False
|
||||
self._sending_gcode = False
|
||||
|
@ -3,7 +3,7 @@
|
||||
|
||||
import os.path
|
||||
import time
|
||||
from typing import Optional
|
||||
from typing import cast, Optional
|
||||
|
||||
from PyQt5.QtCore import pyqtSignal, pyqtProperty, pyqtSlot, QObject
|
||||
|
||||
|
58
run_mypy.py
58
run_mypy.py
@ -1,16 +1,32 @@
|
||||
#!env python
|
||||
#!/usr/bin/env python
|
||||
import os
|
||||
import sys
|
||||
import subprocess
|
||||
|
||||
|
||||
# A quick Python implementation of unix 'where' command.
|
||||
def where(exeName, searchPath=os.getenv("PATH")):
|
||||
paths = searchPath.split(";" if sys.platform == "win32" else ":")
|
||||
for path in paths:
|
||||
candidatePath = os.path.join(path, exeName)
|
||||
if os.path.exists(candidatePath):
|
||||
return candidatePath
|
||||
return None
|
||||
def where(exe_name: str, search_path: str = os.getenv("PATH")) -> str:
|
||||
if search_path is None:
|
||||
search_path = ""
|
||||
paths = search_path.split(os.pathsep)
|
||||
result = ""
|
||||
print(" -> sys.executable location: %s" % sys.executable)
|
||||
sys_exec_dir = os.path.dirname(sys.executable)
|
||||
root_dir = os.path.dirname(sys_exec_dir)
|
||||
paths += [sys_exec_dir,
|
||||
os.path.join(root_dir, "bin"),
|
||||
os.path.join(root_dir, "scripts"),
|
||||
]
|
||||
paths = set(paths)
|
||||
|
||||
for path in sorted(paths):
|
||||
print(" -> Searching %s" % path)
|
||||
candidate_path = os.path.join(path, exe_name)
|
||||
if os.path.exists(candidate_path):
|
||||
result = candidate_path
|
||||
break
|
||||
return result
|
||||
|
||||
|
||||
def findModules(path):
|
||||
result = []
|
||||
@ -19,6 +35,7 @@ def findModules(path):
|
||||
result.append(entry.name)
|
||||
return result
|
||||
|
||||
|
||||
def main():
|
||||
# Find Uranium via the PYTHONPATH var
|
||||
uraniumUMPath = where("UM", os.getenv("PYTHONPATH"))
|
||||
@ -26,16 +43,19 @@ def main():
|
||||
uraniumUMPath = os.path.join("..", "Uranium")
|
||||
uraniumPath = os.path.dirname(uraniumUMPath)
|
||||
|
||||
mypyPathParts = [".", os.path.join(".", "plugins"), os.path.join(".", "plugins", "VersionUpgrade"),
|
||||
uraniumPath, os.path.join(uraniumPath, "stubs")]
|
||||
mypy_path_parts = [".", os.path.join(".", "plugins"), os.path.join(".", "plugins", "VersionUpgrade"),
|
||||
uraniumPath, os.path.join(uraniumPath, "stubs")]
|
||||
if sys.platform == "win32":
|
||||
os.putenv("MYPYPATH", ";".join(mypyPathParts))
|
||||
os.putenv("MYPYPATH", ";".join(mypy_path_parts))
|
||||
else:
|
||||
os.putenv("MYPYPATH", ":".join(mypyPathParts))
|
||||
os.putenv("MYPYPATH", ":".join(mypy_path_parts))
|
||||
|
||||
# Mypy really needs to be run via its Python script otherwise it can't find its data files.
|
||||
mypyExe = where("mypy.bat" if sys.platform == "win32" else "mypy")
|
||||
mypyModule = os.path.join(os.path.dirname(mypyExe), "mypy")
|
||||
mypy_exe_name = "mypy.exe" if sys.platform == "win32" else "mypy"
|
||||
mypy_exe_dir = where(mypy_exe_name)
|
||||
mypy_module = os.path.join(os.path.dirname(mypy_exe_dir), mypy_exe_name)
|
||||
print("Found mypy exe path: %s" % mypy_exe_dir)
|
||||
print("Found mypy module path: %s" % mypy_module)
|
||||
|
||||
plugins = findModules("plugins")
|
||||
plugins.sort()
|
||||
@ -44,11 +64,17 @@ def main():
|
||||
|
||||
for mod in mods:
|
||||
print("------------- Checking module {mod}".format(**locals()))
|
||||
result = subprocess.run([sys.executable, mypyModule, "-p", mod, "--ignore-missing-imports"])
|
||||
if sys.platform == "win32":
|
||||
result = subprocess.run([mypy_module, "-p", mod, "--ignore-missing-imports"])
|
||||
else:
|
||||
result = subprocess.run([sys.executable, mypy_module, "-p", mod, "--ignore-missing-imports"])
|
||||
if result.returncode != 0:
|
||||
print("\nModule {mod} failed checking. :(".format(**locals()))
|
||||
return 1
|
||||
else:
|
||||
print("\n\nDone checking. All is good.")
|
||||
return 0
|
||||
sys.exit(main())
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
sys.exit(main())
|
||||
|
Loading…
x
Reference in New Issue
Block a user