mirror of
https://git.mirrors.martin98.com/https://github.com/Ultimaker/Cura
synced 2025-08-12 14:29:03 +08:00
Merge pull request #10901 from Ultimaker/CURA-8539_oauth_via_httprequestmanager
OAuth via HttpRequestManager
This commit is contained in:
commit
8427555d6a
@ -1,15 +1,15 @@
|
||||
# Copyright (c) 2018 Ultimaker B.V.
|
||||
# Copyright (c) 2021 Ultimaker B.V.
|
||||
# Cura is released under the terms of the LGPLv3 or higher.
|
||||
from datetime import datetime
|
||||
from typing import Any, Optional, Dict, TYPE_CHECKING, Callable
|
||||
|
||||
from datetime import datetime
|
||||
from PyQt5.QtCore import QObject, pyqtSignal, pyqtSlot, pyqtProperty, QTimer, Q_ENUMS
|
||||
from typing import Any, Optional, Dict, TYPE_CHECKING, Callable
|
||||
|
||||
from UM.Logger import Logger
|
||||
from UM.Message import Message
|
||||
from UM.i18n import i18nCatalog
|
||||
from cura.OAuth2.AuthorizationService import AuthorizationService
|
||||
from cura.OAuth2.Models import OAuth2Settings
|
||||
from cura.OAuth2.Models import OAuth2Settings, UserProfile
|
||||
from cura.UltimakerCloud import UltimakerCloudConstants
|
||||
|
||||
if TYPE_CHECKING:
|
||||
@ -46,6 +46,9 @@ class Account(QObject):
|
||||
loginStateChanged = pyqtSignal(bool)
|
||||
"""Signal emitted when user logged in or out"""
|
||||
|
||||
userProfileChanged = pyqtSignal()
|
||||
"""Signal emitted when new account information is available."""
|
||||
|
||||
additionalRightsChanged = pyqtSignal("QVariantMap")
|
||||
"""Signal emitted when a users additional rights change"""
|
||||
|
||||
@ -71,13 +74,14 @@ class Account(QObject):
|
||||
self._application = application
|
||||
self._new_cloud_printers_detected = False
|
||||
|
||||
self._error_message = None # type: Optional[Message]
|
||||
self._error_message: Optional[Message] = None
|
||||
self._logged_in = False
|
||||
self._user_profile: Optional[UserProfile] = None
|
||||
self._additional_rights: Dict[str, Any] = {}
|
||||
self._sync_state = SyncState.IDLE
|
||||
self._manual_sync_enabled = False
|
||||
self._update_packages_enabled = False
|
||||
self._update_packages_action = None # type: Optional[Callable]
|
||||
self._update_packages_action: Optional[Callable] = None
|
||||
self._last_sync_str = "-"
|
||||
|
||||
self._callback_port = 32118
|
||||
@ -103,7 +107,7 @@ class Account(QObject):
|
||||
self._update_timer.setSingleShot(True)
|
||||
self._update_timer.timeout.connect(self.sync)
|
||||
|
||||
self._sync_services = {} # type: Dict[str, int]
|
||||
self._sync_services: Dict[str, int] = {}
|
||||
"""contains entries "service_name" : SyncState"""
|
||||
|
||||
def initialize(self) -> None:
|
||||
@ -196,12 +200,17 @@ class Account(QObject):
|
||||
self._logged_in = logged_in
|
||||
self.loginStateChanged.emit(logged_in)
|
||||
if logged_in:
|
||||
self._authorization_service.getUserProfile(self._onProfileChanged)
|
||||
self._setManualSyncEnabled(False)
|
||||
self._sync()
|
||||
else:
|
||||
if self._update_timer.isActive():
|
||||
self._update_timer.stop()
|
||||
|
||||
def _onProfileChanged(self, profile: Optional[UserProfile]) -> None:
|
||||
self._user_profile = profile
|
||||
self.userProfileChanged.emit()
|
||||
|
||||
def _sync(self) -> None:
|
||||
"""Signals all sync services to start syncing
|
||||
|
||||
@ -243,32 +252,28 @@ class Account(QObject):
|
||||
return
|
||||
self._authorization_service.startAuthorizationFlow(force_logout_before_login)
|
||||
|
||||
@pyqtProperty(str, notify=loginStateChanged)
|
||||
@pyqtProperty(str, notify = userProfileChanged)
|
||||
def userName(self):
|
||||
user_profile = self._authorization_service.getUserProfile()
|
||||
if not user_profile:
|
||||
return None
|
||||
return user_profile.username
|
||||
if not self._user_profile:
|
||||
return ""
|
||||
return self._user_profile.username
|
||||
|
||||
@pyqtProperty(str, notify = loginStateChanged)
|
||||
@pyqtProperty(str, notify = userProfileChanged)
|
||||
def profileImageUrl(self):
|
||||
user_profile = self._authorization_service.getUserProfile()
|
||||
if not user_profile:
|
||||
return None
|
||||
return user_profile.profile_image_url
|
||||
if not self._user_profile:
|
||||
return ""
|
||||
return self._user_profile.profile_image_url
|
||||
|
||||
@pyqtProperty(str, notify=accessTokenChanged)
|
||||
def accessToken(self) -> Optional[str]:
|
||||
return self._authorization_service.getAccessToken()
|
||||
|
||||
@pyqtProperty("QVariantMap", notify = loginStateChanged)
|
||||
@pyqtProperty("QVariantMap", notify = userProfileChanged)
|
||||
def userProfile(self) -> Optional[Dict[str, Optional[str]]]:
|
||||
"""None if no user is logged in otherwise the logged in user as a dict containing containing user_id, username and profile_image_url """
|
||||
|
||||
user_profile = self._authorization_service.getUserProfile()
|
||||
if not user_profile:
|
||||
if not self._user_profile:
|
||||
return None
|
||||
return user_profile.__dict__
|
||||
return self._user_profile.__dict__
|
||||
|
||||
@pyqtProperty(str, notify=lastSyncDateTimeChanged)
|
||||
def lastSyncDateTime(self) -> str:
|
||||
|
@ -1,18 +1,19 @@
|
||||
# Copyright (c) 2021 Ultimaker B.V.
|
||||
# Cura is released under the terms of the LGPLv3 or higher.
|
||||
|
||||
from datetime import datetime
|
||||
import json
|
||||
import secrets
|
||||
from hashlib import sha512
|
||||
from base64 import b64encode
|
||||
from typing import Optional
|
||||
import requests
|
||||
|
||||
from UM.i18n import i18nCatalog
|
||||
from UM.Logger import Logger
|
||||
from datetime import datetime
|
||||
from hashlib import sha512
|
||||
from PyQt5.QtNetwork import QNetworkReply
|
||||
import secrets
|
||||
from typing import Callable, Optional
|
||||
import urllib.parse
|
||||
|
||||
from cura.OAuth2.Models import AuthenticationResponse, UserProfile, OAuth2Settings
|
||||
from UM.i18n import i18nCatalog
|
||||
from UM.Logger import Logger
|
||||
from UM.TaskManagement.HttpRequestManager import HttpRequestManager # To download log-in tokens.
|
||||
|
||||
catalog = i18nCatalog("cura")
|
||||
TOKEN_TIMESTAMP_FORMAT = "%Y-%m-%d %H:%M:%S"
|
||||
|
||||
@ -30,14 +31,13 @@ class AuthorizationHelpers:
|
||||
|
||||
return self._settings
|
||||
|
||||
def getAccessTokenUsingAuthorizationCode(self, authorization_code: str, verification_code: str) -> "AuthenticationResponse":
|
||||
"""Request the access token from the authorization server.
|
||||
|
||||
def getAccessTokenUsingAuthorizationCode(self, authorization_code: str, verification_code: str, callback: Callable[[AuthenticationResponse], None]) -> None:
|
||||
"""
|
||||
Request the access token from the authorization server.
|
||||
:param authorization_code: The authorization code from the 1st step.
|
||||
:param verification_code: The verification code needed for the PKCE extension.
|
||||
:return: An AuthenticationResponse object.
|
||||
:param callback: Once the token has been obtained, this function will be called with the response.
|
||||
"""
|
||||
|
||||
data = {
|
||||
"client_id": self._settings.CLIENT_ID if self._settings.CLIENT_ID is not None else "",
|
||||
"redirect_uri": self._settings.CALLBACK_URL if self._settings.CALLBACK_URL is not None else "",
|
||||
@ -46,18 +46,21 @@ class AuthorizationHelpers:
|
||||
"code_verifier": verification_code,
|
||||
"scope": self._settings.CLIENT_SCOPES if self._settings.CLIENT_SCOPES is not None else "",
|
||||
}
|
||||
try:
|
||||
return self.parseTokenResponse(requests.post(self._token_url, data = data)) # type: ignore
|
||||
except requests.exceptions.ConnectionError as connection_error:
|
||||
return AuthenticationResponse(success = False, err_message = f"Unable to connect to remote server: {connection_error}")
|
||||
headers = {"Content-type": "application/x-www-form-urlencoded"}
|
||||
HttpRequestManager.getInstance().post(
|
||||
self._token_url,
|
||||
data = urllib.parse.urlencode(data).encode("UTF-8"),
|
||||
headers_dict = headers,
|
||||
callback = lambda response: self.parseTokenResponse(response, callback),
|
||||
error_callback = lambda response, _: self.parseTokenResponse(response, callback)
|
||||
)
|
||||
|
||||
def getAccessTokenUsingRefreshToken(self, refresh_token: str) -> "AuthenticationResponse":
|
||||
"""Request the access token from the authorization server using a refresh token.
|
||||
|
||||
:param refresh_token:
|
||||
:return: An AuthenticationResponse object.
|
||||
def getAccessTokenUsingRefreshToken(self, refresh_token: str, callback: Callable[[AuthenticationResponse], None]) -> None:
|
||||
"""
|
||||
Request the access token from the authorization server using a refresh token.
|
||||
:param refresh_token: A long-lived token used to refresh the authentication token.
|
||||
:param callback: Once the token has been obtained, this function will be called with the response.
|
||||
"""
|
||||
|
||||
Logger.log("d", "Refreshing the access token for [%s]", self._settings.OAUTH_SERVER_URL)
|
||||
data = {
|
||||
"client_id": self._settings.CLIENT_ID if self._settings.CLIENT_ID is not None else "",
|
||||
@ -66,75 +69,99 @@ class AuthorizationHelpers:
|
||||
"refresh_token": refresh_token,
|
||||
"scope": self._settings.CLIENT_SCOPES if self._settings.CLIENT_SCOPES is not None else "",
|
||||
}
|
||||
try:
|
||||
return self.parseTokenResponse(requests.post(self._token_url, data = data)) # type: ignore
|
||||
except requests.exceptions.ConnectionError:
|
||||
return AuthenticationResponse(success = False, err_message = "Unable to connect to remote server")
|
||||
except OSError as e:
|
||||
return AuthenticationResponse(success = False, err_message = "Operating system is unable to set up a secure connection: {err}".format(err = str(e)))
|
||||
headers = {"Content-type": "application/x-www-form-urlencoded"}
|
||||
HttpRequestManager.getInstance().post(
|
||||
self._token_url,
|
||||
data = urllib.parse.urlencode(data).encode("UTF-8"),
|
||||
headers_dict = headers,
|
||||
callback = lambda response: self.parseTokenResponse(response, callback),
|
||||
error_callback = lambda response, _: self.parseTokenResponse(response, callback)
|
||||
)
|
||||
|
||||
@staticmethod
|
||||
def parseTokenResponse(token_response: requests.models.Response) -> "AuthenticationResponse":
|
||||
def parseTokenResponse(self, token_response: QNetworkReply, callback: Callable[[AuthenticationResponse], None]) -> None:
|
||||
"""Parse the token response from the authorization server into an AuthenticationResponse object.
|
||||
|
||||
:param token_response: The JSON string data response from the authorization server.
|
||||
:return: An AuthenticationResponse object.
|
||||
"""
|
||||
|
||||
token_data = None
|
||||
|
||||
try:
|
||||
token_data = json.loads(token_response.text)
|
||||
except ValueError:
|
||||
Logger.log("w", "Could not parse token response data: %s", token_response.text)
|
||||
|
||||
token_data = HttpRequestManager.readJSON(token_response)
|
||||
if not token_data:
|
||||
return AuthenticationResponse(success = False, err_message = catalog.i18nc("@message", "Could not read response."))
|
||||
callback(AuthenticationResponse(success = False, err_message = catalog.i18nc("@message", "Could not read response.")))
|
||||
return
|
||||
|
||||
if token_response.status_code not in (200, 201):
|
||||
return AuthenticationResponse(success = False, err_message = token_data["error_description"])
|
||||
if token_response.error() != QNetworkReply.NetworkError.NoError:
|
||||
callback(AuthenticationResponse(success = False, err_message = token_data["error_description"]))
|
||||
return
|
||||
|
||||
return AuthenticationResponse(success=True,
|
||||
token_type=token_data["token_type"],
|
||||
access_token=token_data["access_token"],
|
||||
refresh_token=token_data["refresh_token"],
|
||||
expires_in=token_data["expires_in"],
|
||||
scope=token_data["scope"],
|
||||
received_at=datetime.now().strftime(TOKEN_TIMESTAMP_FORMAT))
|
||||
callback(AuthenticationResponse(success = True,
|
||||
token_type = token_data["token_type"],
|
||||
access_token = token_data["access_token"],
|
||||
refresh_token = token_data["refresh_token"],
|
||||
expires_in = token_data["expires_in"],
|
||||
scope = token_data["scope"],
|
||||
received_at = datetime.now().strftime(TOKEN_TIMESTAMP_FORMAT)))
|
||||
return
|
||||
|
||||
def parseJWT(self, access_token: str) -> Optional["UserProfile"]:
|
||||
def checkToken(self, access_token: str, success_callback: Optional[Callable[[UserProfile], None]] = None, failed_callback: Optional[Callable[[], None]] = None) -> None:
|
||||
"""Calls the authentication API endpoint to get the token data.
|
||||
|
||||
The API is called asynchronously. When a response is given, the callback is called with the user's profile.
|
||||
:param access_token: The encoded JWT token.
|
||||
:return: Dict containing some profile data.
|
||||
:param success_callback: When a response is given, this function will be called with a user profile. If None,
|
||||
there will not be a callback.
|
||||
:param failed_callback: When the request failed or the response didn't parse, this function will be called.
|
||||
"""
|
||||
|
||||
try:
|
||||
check_token_url = "{}/check-token".format(self._settings.OAUTH_SERVER_URL)
|
||||
Logger.log("d", "Checking the access token for [%s]", check_token_url)
|
||||
token_request = requests.get(check_token_url, headers = {
|
||||
"Authorization": "Bearer {}".format(access_token)
|
||||
})
|
||||
except (requests.exceptions.ConnectionError, requests.exceptions.Timeout):
|
||||
# Connection was suddenly dropped. Nothing we can do about that.
|
||||
Logger.logException("w", "Something failed while attempting to parse the JWT token")
|
||||
return None
|
||||
if token_request.status_code not in (200, 201):
|
||||
Logger.log("w", "Could not retrieve token data from auth server: %s", token_request.text)
|
||||
return None
|
||||
user_data = token_request.json().get("data")
|
||||
if not user_data or not isinstance(user_data, dict):
|
||||
Logger.log("w", "Could not parse user data from token: %s", user_data)
|
||||
return None
|
||||
|
||||
return UserProfile(
|
||||
user_id = user_data["user_id"],
|
||||
username = user_data["username"],
|
||||
profile_image_url = user_data.get("profile_image_url", ""),
|
||||
organization_id = user_data.get("organization", {}).get("organization_id"),
|
||||
subscriptions = user_data.get("subscriptions", [])
|
||||
check_token_url = "{}/check-token".format(self._settings.OAUTH_SERVER_URL)
|
||||
Logger.log("d", "Checking the access token for [%s]", check_token_url)
|
||||
headers = {
|
||||
"Authorization": f"Bearer {access_token}"
|
||||
}
|
||||
HttpRequestManager.getInstance().get(
|
||||
check_token_url,
|
||||
headers_dict = headers,
|
||||
callback = lambda reply: self._parseUserProfile(reply, success_callback, failed_callback),
|
||||
error_callback = lambda _, _2: failed_callback() if failed_callback is not None else None
|
||||
)
|
||||
|
||||
def _parseUserProfile(self, reply: QNetworkReply, success_callback: Optional[Callable[[UserProfile], None]], failed_callback: Optional[Callable[[], None]] = None) -> None:
|
||||
"""
|
||||
Parses the user profile from a reply to /check-token.
|
||||
|
||||
If the response is valid, the callback will be called to return the user profile to the caller.
|
||||
:param reply: A network reply to a request to the /check-token URL.
|
||||
:param success_callback: A function to call once a user profile was successfully obtained.
|
||||
:param failed_callback: A function to call if parsing the profile failed.
|
||||
"""
|
||||
if reply.error() != QNetworkReply.NetworkError.NoError:
|
||||
Logger.warning(f"Could not access account information. QNetworkError {reply.errorString()}")
|
||||
if failed_callback is not None:
|
||||
failed_callback()
|
||||
return
|
||||
|
||||
profile_data = HttpRequestManager.getInstance().readJSON(reply)
|
||||
if profile_data is None or "data" not in profile_data:
|
||||
Logger.warning("Could not parse user data from token.")
|
||||
if failed_callback is not None:
|
||||
failed_callback()
|
||||
return
|
||||
profile_data = profile_data["data"]
|
||||
|
||||
required_fields = {"user_id", "username"}
|
||||
if "user_id" not in profile_data or "username" not in profile_data:
|
||||
Logger.warning(f"User data missing required field(s): {required_fields - set(profile_data.keys())}")
|
||||
if failed_callback is not None:
|
||||
failed_callback()
|
||||
return
|
||||
|
||||
if success_callback is not None:
|
||||
success_callback(UserProfile(
|
||||
user_id = profile_data["user_id"],
|
||||
username = profile_data["username"],
|
||||
profile_image_url = profile_data.get("profile_image_url", ""),
|
||||
organization_id = profile_data.get("organization", {}).get("organization_id"),
|
||||
subscriptions = profile_data.get("subscriptions", [])
|
||||
))
|
||||
|
||||
@staticmethod
|
||||
def generateVerificationCode(code_length: int = 32) -> str:
|
||||
"""Generate a verification code of arbitrary length.
|
||||
|
@ -2,6 +2,7 @@
|
||||
# Cura is released under the terms of the LGPLv3 or higher.
|
||||
|
||||
from http.server import BaseHTTPRequestHandler
|
||||
from threading import Lock # To turn an asynchronous call synchronous.
|
||||
from typing import Optional, Callable, Tuple, Dict, Any, List, TYPE_CHECKING
|
||||
from urllib.parse import parse_qs, urlparse
|
||||
|
||||
@ -14,6 +15,7 @@ if TYPE_CHECKING:
|
||||
|
||||
catalog = i18nCatalog("cura")
|
||||
|
||||
|
||||
class AuthorizationRequestHandler(BaseHTTPRequestHandler):
|
||||
"""This handler handles all HTTP requests on the local web server.
|
||||
|
||||
@ -24,11 +26,11 @@ class AuthorizationRequestHandler(BaseHTTPRequestHandler):
|
||||
super().__init__(request, client_address, server)
|
||||
|
||||
# These values will be injected by the HTTPServer that this handler belongs to.
|
||||
self.authorization_helpers = None # type: Optional[AuthorizationHelpers]
|
||||
self.authorization_callback = None # type: Optional[Callable[[AuthenticationResponse], None]]
|
||||
self.verification_code = None # type: Optional[str]
|
||||
self.authorization_helpers: Optional[AuthorizationHelpers] = None
|
||||
self.authorization_callback: Optional[Callable[[AuthenticationResponse], None]] = None
|
||||
self.verification_code: Optional[str] = None
|
||||
|
||||
self.state = None # type: Optional[str]
|
||||
self.state: Optional[str] = None
|
||||
|
||||
# CURA-6609: Some browser seems to issue a HEAD instead of GET request as the callback.
|
||||
def do_HEAD(self) -> None:
|
||||
@ -70,13 +72,23 @@ class AuthorizationRequestHandler(BaseHTTPRequestHandler):
|
||||
if state != self.state:
|
||||
token_response = AuthenticationResponse(
|
||||
success = False,
|
||||
err_message=catalog.i18nc("@message",
|
||||
"The provided state is not correct.")
|
||||
err_message = catalog.i18nc("@message", "The provided state is not correct.")
|
||||
)
|
||||
elif code and self.authorization_helpers is not None and self.verification_code is not None:
|
||||
token_response = AuthenticationResponse(
|
||||
success = False,
|
||||
err_message = catalog.i18nc("@message", "Timeout when authenticating with the account server.")
|
||||
)
|
||||
# If the code was returned we get the access token.
|
||||
token_response = self.authorization_helpers.getAccessTokenUsingAuthorizationCode(
|
||||
code, self.verification_code)
|
||||
lock = Lock()
|
||||
lock.acquire()
|
||||
|
||||
def callback(response: AuthenticationResponse) -> None:
|
||||
nonlocal token_response
|
||||
token_response = response
|
||||
lock.release()
|
||||
self.authorization_helpers.getAccessTokenUsingAuthorizationCode(code, self.verification_code, callback)
|
||||
lock.acquire(timeout = 60) # Block thread until request is completed (which releases the lock). If not acquired, the timeout message stays.
|
||||
|
||||
elif self._queryGet(query, "error_code") == "user_denied":
|
||||
# Otherwise we show an error message (probably the user clicked "Deny" in the auth dialog).
|
||||
|
@ -3,10 +3,9 @@
|
||||
|
||||
import json
|
||||
from datetime import datetime, timedelta
|
||||
from typing import Optional, TYPE_CHECKING, Dict
|
||||
from typing import Callable, Dict, Optional, TYPE_CHECKING, Union
|
||||
from urllib.parse import urlencode, quote_plus
|
||||
|
||||
import requests.exceptions
|
||||
from PyQt5.QtCore import QUrl
|
||||
from PyQt5.QtGui import QDesktopServices
|
||||
|
||||
@ -16,7 +15,7 @@ from UM.Signal import Signal
|
||||
from UM.i18n import i18nCatalog
|
||||
from cura.OAuth2.AuthorizationHelpers import AuthorizationHelpers, TOKEN_TIMESTAMP_FORMAT
|
||||
from cura.OAuth2.LocalAuthorizationServer import LocalAuthorizationServer
|
||||
from cura.OAuth2.Models import AuthenticationResponse
|
||||
from cura.OAuth2.Models import AuthenticationResponse, BaseModel
|
||||
|
||||
i18n_catalog = i18nCatalog("cura")
|
||||
|
||||
@ -26,6 +25,7 @@ if TYPE_CHECKING:
|
||||
|
||||
MYCLOUD_LOGOFF_URL = "https://account.ultimaker.com/logoff?utm_source=cura&utm_medium=software&utm_campaign=change-account-before-adding-printers"
|
||||
|
||||
|
||||
class AuthorizationService:
|
||||
"""The authorization service is responsible for handling the login flow, storing user credentials and providing
|
||||
account information.
|
||||
@ -43,12 +43,13 @@ class AuthorizationService:
|
||||
self._settings = settings
|
||||
self._auth_helpers = AuthorizationHelpers(settings)
|
||||
self._auth_url = "{}/authorize".format(self._settings.OAUTH_SERVER_URL)
|
||||
self._auth_data = None # type: Optional[AuthenticationResponse]
|
||||
self._user_profile = None # type: Optional["UserProfile"]
|
||||
self._auth_data: Optional[AuthenticationResponse] = None
|
||||
self._user_profile: Optional["UserProfile"] = None
|
||||
self._preferences = preferences
|
||||
self._server = LocalAuthorizationServer(self._auth_helpers, self._onAuthStateChanged, daemon=True)
|
||||
self._currently_refreshing_token = False # Whether we are currently in the process of refreshing auth. Don't make new requests while busy.
|
||||
|
||||
self._unable_to_get_data_message = None # type: Optional[Message]
|
||||
self._unable_to_get_data_message: Optional[Message] = None
|
||||
|
||||
self.onAuthStateChanged.connect(self._authChanged)
|
||||
|
||||
@ -62,69 +63,80 @@ class AuthorizationService:
|
||||
if self._preferences:
|
||||
self._preferences.addPreference(self._settings.AUTH_DATA_PREFERENCE_KEY, "{}")
|
||||
|
||||
def getUserProfile(self) -> Optional["UserProfile"]:
|
||||
"""Get the user profile as obtained from the JWT (JSON Web Token).
|
||||
def getUserProfile(self, callback: Optional[Callable[[Optional["UserProfile"]], None]] = None) -> None:
|
||||
"""
|
||||
Get the user profile as obtained from the JWT (JSON Web Token).
|
||||
|
||||
If the JWT is not yet parsed, calling this will take care of that.
|
||||
|
||||
:return: UserProfile if a user is logged in, None otherwise.
|
||||
If the JWT is not yet checked and parsed, calling this will take care of that.
|
||||
:param callback: Once the user profile is obtained, this function will be called with the given user profile. If
|
||||
the profile fails to be obtained, this function will be called with None.
|
||||
|
||||
See also: :py:method:`cura.OAuth2.AuthorizationService.AuthorizationService._parseJWT`
|
||||
"""
|
||||
if self._user_profile:
|
||||
# We already obtained the profile. No need to make another request for it.
|
||||
if callback is not None:
|
||||
callback(self._user_profile)
|
||||
return
|
||||
|
||||
if not self._user_profile:
|
||||
# If no user profile was stored locally, we try to get it from JWT.
|
||||
try:
|
||||
self._user_profile = self._parseJWT()
|
||||
except requests.exceptions.ConnectionError:
|
||||
# Unable to get connection, can't login.
|
||||
Logger.logException("w", "Unable to validate user data with the remote server.")
|
||||
return None
|
||||
# If no user profile was stored locally, we try to get it from JWT.
|
||||
def store_profile(profile: Optional["UserProfile"]) -> None:
|
||||
if profile is not None:
|
||||
self._user_profile = profile
|
||||
if callback is not None:
|
||||
callback(profile)
|
||||
elif self._auth_data:
|
||||
# If there is no user profile from the JWT, we have to log in again.
|
||||
Logger.warning("The user profile could not be loaded. The user must log in again!")
|
||||
self.deleteAuthData()
|
||||
if callback is not None:
|
||||
callback(None)
|
||||
else:
|
||||
if callback is not None:
|
||||
callback(None)
|
||||
|
||||
if not self._user_profile and self._auth_data:
|
||||
# If there is still no user profile from the JWT, we have to log in again.
|
||||
Logger.log("w", "The user profile could not be loaded. The user must log in again!")
|
||||
self.deleteAuthData()
|
||||
return None
|
||||
self._parseJWT(callback = store_profile)
|
||||
|
||||
return self._user_profile
|
||||
|
||||
def _parseJWT(self) -> Optional["UserProfile"]:
|
||||
"""Tries to parse the JWT (JSON Web Token) data, which it does if all the needed data is there.
|
||||
|
||||
:return: UserProfile if it was able to parse, None otherwise.
|
||||
def _parseJWT(self, callback: Callable[[Optional["UserProfile"]], None]) -> None:
|
||||
"""
|
||||
Tries to parse the JWT (JSON Web Token) data, which it does if all the needed data is there.
|
||||
:param callback: A function to call asynchronously once the user profile has been obtained. It will be called
|
||||
with `None` if it failed to obtain a user profile.
|
||||
"""
|
||||
|
||||
if not self._auth_data or self._auth_data.access_token is None:
|
||||
# If no auth data exists, we should always log in again.
|
||||
Logger.log("d", "There was no auth data or access token")
|
||||
return None
|
||||
Logger.debug("There was no auth data or access token")
|
||||
callback(None)
|
||||
return
|
||||
|
||||
try:
|
||||
user_data = self._auth_helpers.parseJWT(self._auth_data.access_token)
|
||||
except AttributeError:
|
||||
# THis might seem a bit double, but we get crash reports about this (CURA-2N2 in sentry)
|
||||
Logger.log("d", "There was no auth data or access token")
|
||||
return None
|
||||
# When we checked the token we may get a user profile. This callback checks if that is a valid one and tries to refresh the token if it's not.
|
||||
def check_user_profile(user_profile: Optional["UserProfile"]) -> None:
|
||||
if user_profile:
|
||||
# If the profile was found, we call it back immediately.
|
||||
callback(user_profile)
|
||||
return
|
||||
# The JWT was expired or invalid and we should request a new one.
|
||||
if self._auth_data is None or self._auth_data.refresh_token is None:
|
||||
Logger.warning("There was no refresh token in the auth data.")
|
||||
callback(None)
|
||||
return
|
||||
|
||||
if user_data:
|
||||
# If the profile was found, we return it immediately.
|
||||
return user_data
|
||||
# The JWT was expired or invalid and we should request a new one.
|
||||
if self._auth_data.refresh_token is None:
|
||||
Logger.log("w", "There was no refresh token in the auth data.")
|
||||
return None
|
||||
self._auth_data = self._auth_helpers.getAccessTokenUsingRefreshToken(self._auth_data.refresh_token)
|
||||
if not self._auth_data or self._auth_data.access_token is None:
|
||||
Logger.log("w", "Unable to use the refresh token to get a new access token.")
|
||||
# The token could not be refreshed using the refresh token. We should login again.
|
||||
return None
|
||||
# Ensure it gets stored as otherwise we only have it in memory. The stored refresh token has been deleted
|
||||
# from the server already. Do not store the auth_data if we could not get new auth_data (eg due to a
|
||||
# network error), since this would cause an infinite loop trying to get new auth-data
|
||||
if self._auth_data.success:
|
||||
self._storeAuthData(self._auth_data)
|
||||
return self._auth_helpers.parseJWT(self._auth_data.access_token)
|
||||
def process_auth_data(auth_data: AuthenticationResponse) -> None:
|
||||
if auth_data.access_token is None:
|
||||
Logger.warning("Unable to use the refresh token to get a new access token.")
|
||||
callback(None)
|
||||
return
|
||||
# Ensure it gets stored as otherwise we only have it in memory. The stored refresh token has been
|
||||
# deleted from the server already. Do not store the auth_data if we could not get new auth_data (e.g.
|
||||
# due to a network error), since this would cause an infinite loop trying to get new auth-data.
|
||||
if auth_data.success:
|
||||
self._storeAuthData(auth_data)
|
||||
self._auth_helpers.checkToken(auth_data.access_token, callback, lambda: callback(None))
|
||||
|
||||
self._auth_helpers.getAccessTokenUsingRefreshToken(self._auth_data.refresh_token, process_auth_data)
|
||||
|
||||
self._auth_helpers.checkToken(self._auth_data.access_token, check_user_profile, lambda: check_user_profile(None))
|
||||
|
||||
def getAccessToken(self) -> Optional[str]:
|
||||
"""Get the access token as provided by the response data."""
|
||||
@ -149,13 +161,20 @@ class AuthorizationService:
|
||||
if self._auth_data is None or self._auth_data.refresh_token is None:
|
||||
Logger.log("w", "Unable to refresh access token, since there is no refresh token.")
|
||||
return
|
||||
response = self._auth_helpers.getAccessTokenUsingRefreshToken(self._auth_data.refresh_token)
|
||||
if response.success:
|
||||
self._storeAuthData(response)
|
||||
self.onAuthStateChanged.emit(logged_in = True)
|
||||
else:
|
||||
Logger.log("w", "Failed to get a new access token from the server.")
|
||||
self.onAuthStateChanged.emit(logged_in = False)
|
||||
|
||||
def process_auth_data(response: AuthenticationResponse) -> None:
|
||||
if response.success:
|
||||
self._storeAuthData(response)
|
||||
self.onAuthStateChanged.emit(logged_in = True)
|
||||
else:
|
||||
Logger.warning("Failed to get a new access token from the server.")
|
||||
self.onAuthStateChanged.emit(logged_in = False)
|
||||
|
||||
if self._currently_refreshing_token:
|
||||
Logger.debug("Was already busy refreshing token. Do not start a new request.")
|
||||
return
|
||||
self._currently_refreshing_token = True
|
||||
self._auth_helpers.getAccessTokenUsingRefreshToken(self._auth_data.refresh_token, process_auth_data)
|
||||
|
||||
def deleteAuthData(self) -> None:
|
||||
"""Delete the authentication data that we have stored locally (eg; logout)"""
|
||||
@ -244,21 +263,23 @@ class AuthorizationService:
|
||||
preferences_data = json.loads(self._preferences.getValue(self._settings.AUTH_DATA_PREFERENCE_KEY))
|
||||
if preferences_data:
|
||||
self._auth_data = AuthenticationResponse(**preferences_data)
|
||||
# Also check if we can actually get the user profile information.
|
||||
user_profile = self.getUserProfile()
|
||||
if user_profile is not None:
|
||||
self.onAuthStateChanged.emit(logged_in = True)
|
||||
Logger.log("d", "Auth data was successfully loaded")
|
||||
else:
|
||||
if self._unable_to_get_data_message is not None:
|
||||
self._unable_to_get_data_message.hide()
|
||||
|
||||
self._unable_to_get_data_message = Message(i18n_catalog.i18nc("@info",
|
||||
"Unable to reach the Ultimaker account server."),
|
||||
title = i18n_catalog.i18nc("@info:title", "Warning"),
|
||||
message_type = Message.MessageType.ERROR)
|
||||
Logger.log("w", "Unable to load auth data from preferences")
|
||||
self._unable_to_get_data_message.show()
|
||||
# Also check if we can actually get the user profile information.
|
||||
def callback(profile: Optional["UserProfile"]) -> None:
|
||||
if profile is not None:
|
||||
self.onAuthStateChanged.emit(logged_in = True)
|
||||
Logger.debug("Auth data was successfully loaded")
|
||||
else:
|
||||
if self._unable_to_get_data_message is not None:
|
||||
self._unable_to_get_data_message.show()
|
||||
else:
|
||||
self._unable_to_get_data_message = Message(i18n_catalog.i18nc("@info",
|
||||
"Unable to reach the Ultimaker account server."),
|
||||
title = i18n_catalog.i18nc("@info:title", "Log-in failed"),
|
||||
message_type = Message.MessageType.ERROR)
|
||||
Logger.warning("Unable to get user profile using auth data from preferences.")
|
||||
self._unable_to_get_data_message.show()
|
||||
self.getUserProfile(callback)
|
||||
except (ValueError, TypeError):
|
||||
Logger.logException("w", "Could not load auth data from preferences")
|
||||
|
||||
@ -271,8 +292,9 @@ class AuthorizationService:
|
||||
return
|
||||
|
||||
self._auth_data = auth_data
|
||||
self._currently_refreshing_token = False
|
||||
if auth_data:
|
||||
self._user_profile = self.getUserProfile()
|
||||
self.getUserProfile()
|
||||
self._preferences.setValue(self._settings.AUTH_DATA_PREFERENCE_KEY, json.dumps(auth_data.dump()))
|
||||
else:
|
||||
Logger.log("d", "Clearing the user profile")
|
||||
|
@ -66,7 +66,7 @@ class CloudPackageChecker(QObject):
|
||||
self._application.getHttpRequestManager().get(url,
|
||||
callback = self._onUserPackagesRequestFinished,
|
||||
error_callback = self._onUserPackagesRequestFinished,
|
||||
timeout=10,
|
||||
timeout = 10,
|
||||
scope = self._scope)
|
||||
|
||||
def _onUserPackagesRequestFinished(self, reply: "QNetworkReply", error: Optional["QNetworkReply.NetworkError"] = None) -> None:
|
||||
|
@ -80,46 +80,6 @@ def test_errorLoginState(application):
|
||||
account._onLoginStateChanged(False, "OMGZOMG!")
|
||||
account.loginStateChanged.emit.called_with(False)
|
||||
|
||||
|
||||
def test_userName(user_profile):
|
||||
account = Account(MagicMock())
|
||||
mocked_auth_service = MagicMock()
|
||||
account._authorization_service = mocked_auth_service
|
||||
mocked_auth_service.getUserProfile = MagicMock(return_value = user_profile)
|
||||
|
||||
assert account.userName == "username!"
|
||||
|
||||
mocked_auth_service.getUserProfile = MagicMock(return_value=None)
|
||||
assert account.userName is None
|
||||
|
||||
|
||||
def test_profileImageUrl(user_profile):
|
||||
account = Account(MagicMock())
|
||||
mocked_auth_service = MagicMock()
|
||||
account._authorization_service = mocked_auth_service
|
||||
mocked_auth_service.getUserProfile = MagicMock(return_value = user_profile)
|
||||
|
||||
assert account.profileImageUrl == "profile_image_url!"
|
||||
|
||||
mocked_auth_service.getUserProfile = MagicMock(return_value=None)
|
||||
assert account.profileImageUrl is None
|
||||
|
||||
|
||||
def test_userProfile(user_profile):
|
||||
account = Account(MagicMock())
|
||||
mocked_auth_service = MagicMock()
|
||||
account._authorization_service = mocked_auth_service
|
||||
mocked_auth_service.getUserProfile = MagicMock(return_value=user_profile)
|
||||
|
||||
returned_user_profile = account.userProfile
|
||||
assert returned_user_profile["username"] == "username!"
|
||||
assert returned_user_profile["profile_image_url"] == "profile_image_url!"
|
||||
assert returned_user_profile["user_id"] == "user_id!"
|
||||
|
||||
mocked_auth_service.getUserProfile = MagicMock(return_value=None)
|
||||
assert account.userProfile is None
|
||||
|
||||
|
||||
def test_sync_success():
|
||||
account = Account(MagicMock())
|
||||
|
||||
|
@ -1,9 +1,11 @@
|
||||
from datetime import datetime
|
||||
from unittest.mock import MagicMock, patch
|
||||
# Copyright (c) 2021 Ultimaker B.V.
|
||||
# Cura is released under the terms of the LGPLv3 or higher.
|
||||
|
||||
import requests
|
||||
from datetime import datetime
|
||||
from unittest.mock import MagicMock, Mock, patch
|
||||
|
||||
from PyQt5.QtGui import QDesktopServices
|
||||
from PyQt5.QtNetwork import QNetworkReply
|
||||
|
||||
from UM.Preferences import Preferences
|
||||
from cura.OAuth2.AuthorizationHelpers import AuthorizationHelpers, TOKEN_TIMESTAMP_FORMAT
|
||||
@ -39,6 +41,14 @@ SUCCESSFUL_AUTH_RESPONSE = AuthenticationResponse(
|
||||
success = True
|
||||
)
|
||||
|
||||
EXPIRED_AUTH_RESPONSE = AuthenticationResponse(
|
||||
access_token = "expired",
|
||||
refresh_token = "beep?",
|
||||
received_at = datetime.now().strftime(TOKEN_TIMESTAMP_FORMAT),
|
||||
expires_in = 300, # 5 minutes should be more than enough for testing
|
||||
success = True
|
||||
)
|
||||
|
||||
NO_REFRESH_AUTH_RESPONSE = AuthenticationResponse(
|
||||
access_token = "beep",
|
||||
received_at = datetime.now().strftime(TOKEN_TIMESTAMP_FORMAT),
|
||||
@ -50,12 +60,17 @@ MALFORMED_AUTH_RESPONSE = AuthenticationResponse(success=False)
|
||||
|
||||
|
||||
def test_cleanAuthService() -> None:
|
||||
# Ensure that when setting up an AuthorizationService, no data is set.
|
||||
"""
|
||||
Ensure that when setting up an AuthorizationService, no data is set.
|
||||
"""
|
||||
authorization_service = AuthorizationService(OAUTH_SETTINGS, Preferences())
|
||||
authorization_service.initialize()
|
||||
assert authorization_service.getUserProfile() is None
|
||||
assert authorization_service.getAccessToken() is None
|
||||
|
||||
mock_callback = Mock()
|
||||
authorization_service.getUserProfile(mock_callback)
|
||||
mock_callback.assert_called_once_with(None)
|
||||
|
||||
assert authorization_service.getAccessToken() is None
|
||||
|
||||
def test_refreshAccessTokenSuccess():
|
||||
authorization_service = AuthorizationService(OAUTH_SETTINGS, Preferences())
|
||||
@ -68,34 +83,83 @@ def test_refreshAccessTokenSuccess():
|
||||
authorization_service.refreshAccessToken()
|
||||
assert authorization_service.onAuthStateChanged.emit.called_with(True)
|
||||
|
||||
|
||||
def test__parseJWTNoRefreshToken():
|
||||
authorization_service = AuthorizationService(OAUTH_SETTINGS, Preferences())
|
||||
with patch.object(AuthorizationService, "getUserProfile", return_value=UserProfile()):
|
||||
authorization_service._storeAuthData(NO_REFRESH_AUTH_RESPONSE)
|
||||
assert authorization_service._parseJWT() is None
|
||||
"""
|
||||
Tests parsing the user profile if there is no refresh token stored, but there is a normal authentication token.
|
||||
|
||||
The request for the user profile using the authentication token should still work normally.
|
||||
"""
|
||||
authorization_service = AuthorizationService(OAUTH_SETTINGS, Preferences())
|
||||
with patch.object(AuthorizationService, "getUserProfile", return_value = UserProfile()):
|
||||
authorization_service._storeAuthData(NO_REFRESH_AUTH_RESPONSE)
|
||||
|
||||
mock_callback = Mock() # To log the final profile response.
|
||||
mock_reply = Mock() # The user profile that the service should respond with.
|
||||
mock_reply.error = Mock(return_value = QNetworkReply.NetworkError.NoError)
|
||||
http_mock = Mock()
|
||||
http_mock.get = lambda url, headers_dict, callback, error_callback: callback(mock_reply)
|
||||
http_mock.readJSON = Mock(return_value = {"data": {"user_id": "id_ego_or_superego", "username": "Ghostkeeper"}})
|
||||
|
||||
with patch("UM.TaskManagement.HttpRequestManager.HttpRequestManager.getInstance", MagicMock(return_value = http_mock)):
|
||||
authorization_service._parseJWT(mock_callback)
|
||||
mock_callback.assert_called_once()
|
||||
profile_reply = mock_callback.call_args_list[0][0][0]
|
||||
assert profile_reply.user_id == "id_ego_or_superego"
|
||||
assert profile_reply.username == "Ghostkeeper"
|
||||
|
||||
def test__parseJWTFailOnRefresh():
|
||||
"""
|
||||
Tries to refresh the authentication token using an invalid refresh token. The request should fail.
|
||||
"""
|
||||
authorization_service = AuthorizationService(OAUTH_SETTINGS, Preferences())
|
||||
with patch.object(AuthorizationService, "getUserProfile", return_value=UserProfile()):
|
||||
with patch.object(AuthorizationService, "getUserProfile", return_value = UserProfile()):
|
||||
authorization_service._storeAuthData(SUCCESSFUL_AUTH_RESPONSE)
|
||||
|
||||
with patch.object(AuthorizationHelpers, "getAccessTokenUsingRefreshToken", return_value=FAILED_AUTH_RESPONSE):
|
||||
assert authorization_service._parseJWT() is None
|
||||
mock_callback = Mock() # To log the final profile response.
|
||||
mock_reply = Mock() # The response that the request should give, containing an error about it failing to authenticate.
|
||||
mock_reply.error = Mock(return_value = QNetworkReply.NetworkError.AuthenticationRequiredError) # The reply is 403: Authentication required, meaning the server responded with a "Can't do that, Dave".
|
||||
http_mock = Mock()
|
||||
http_mock.get = lambda url, headers_dict, callback, error_callback: callback(mock_reply)
|
||||
http_mock.post = lambda url, data, headers_dict, callback, error_callback: callback(mock_reply)
|
||||
|
||||
with patch("UM.TaskManagement.HttpRequestManager.HttpRequestManager.readJSON", Mock(return_value = {"error_description": "Mock a failed request!"})):
|
||||
with patch("UM.TaskManagement.HttpRequestManager.HttpRequestManager.getInstance", MagicMock(return_value = http_mock)):
|
||||
authorization_service._parseJWT(mock_callback)
|
||||
mock_callback.assert_called_once_with(None)
|
||||
|
||||
def test__parseJWTSucceedOnRefresh():
|
||||
"""
|
||||
Tries to refresh the authentication token using a valid refresh token. The request should succeed.
|
||||
"""
|
||||
authorization_service = AuthorizationService(OAUTH_SETTINGS, Preferences())
|
||||
authorization_service.initialize()
|
||||
with patch.object(AuthorizationService, "getUserProfile", return_value=UserProfile()):
|
||||
authorization_service._storeAuthData(SUCCESSFUL_AUTH_RESPONSE)
|
||||
with patch.object(AuthorizationService, "getUserProfile", return_value = UserProfile()):
|
||||
authorization_service._storeAuthData(EXPIRED_AUTH_RESPONSE)
|
||||
|
||||
with patch.object(AuthorizationHelpers, "getAccessTokenUsingRefreshToken", return_value=SUCCESSFUL_AUTH_RESPONSE):
|
||||
with patch.object(AuthorizationHelpers, "parseJWT", MagicMock(return_value = None)) as mocked_parseJWT:
|
||||
authorization_service._parseJWT()
|
||||
mocked_parseJWT.assert_called_with("beep")
|
||||
mock_callback = Mock() # To log the final profile response.
|
||||
mock_reply_success = Mock() # The reply should be a failure when using the expired access token, but succeed when using the refresh token.
|
||||
mock_reply_success.error = Mock(return_value = QNetworkReply.NetworkError.NoError)
|
||||
mock_reply_failure = Mock()
|
||||
mock_reply_failure.error = Mock(return_value = QNetworkReply.NetworkError.AuthenticationRequiredError)
|
||||
http_mock = Mock()
|
||||
def mock_get(url, headers_dict, callback, error_callback):
|
||||
if(headers_dict == {"Authorization": "Bearer beep"}):
|
||||
callback(mock_reply_success)
|
||||
else:
|
||||
callback(mock_reply_failure)
|
||||
http_mock.get = mock_get
|
||||
http_mock.readJSON = Mock(return_value = {"data": {"user_id": "user_idea", "username": "Ghostkeeper"}})
|
||||
def mock_refresh(self, refresh_token, callback): # Refreshing gives a valid token.
|
||||
callback(SUCCESSFUL_AUTH_RESPONSE)
|
||||
|
||||
with patch("cura.OAuth2.AuthorizationHelpers.AuthorizationHelpers.getAccessTokenUsingRefreshToken", mock_refresh):
|
||||
with patch("UM.TaskManagement.HttpRequestManager.HttpRequestManager.getInstance", MagicMock(return_value = http_mock)):
|
||||
authorization_service._parseJWT(mock_callback)
|
||||
|
||||
mock_callback.assert_called_once()
|
||||
profile_reply = mock_callback.call_args_list[0][0][0]
|
||||
assert profile_reply.user_id == "user_idea"
|
||||
assert profile_reply.username == "Ghostkeeper"
|
||||
|
||||
def test_initialize():
|
||||
original_preference = MagicMock()
|
||||
@ -105,17 +169,28 @@ def test_initialize():
|
||||
initialize_preferences.addPreference.assert_called_once_with("test/auth_data", "{}")
|
||||
original_preference.addPreference.assert_not_called()
|
||||
|
||||
|
||||
def test_refreshAccessTokenFailed():
|
||||
"""
|
||||
Test if the authentication is reset once the refresh token fails to refresh access.
|
||||
"""
|
||||
authorization_service = AuthorizationService(OAUTH_SETTINGS, Preferences())
|
||||
authorization_service.initialize()
|
||||
with patch.object(AuthorizationService, "getUserProfile", return_value=UserProfile()):
|
||||
authorization_service._storeAuthData(SUCCESSFUL_AUTH_RESPONSE)
|
||||
authorization_service.onAuthStateChanged.emit = MagicMock()
|
||||
with patch.object(AuthorizationHelpers, "getAccessTokenUsingRefreshToken", return_value=FAILED_AUTH_RESPONSE):
|
||||
authorization_service.refreshAccessToken()
|
||||
assert authorization_service.onAuthStateChanged.emit.called_with(False)
|
||||
|
||||
def mock_refresh(self, refresh_token, callback): # Refreshing gives a valid token.
|
||||
callback(FAILED_AUTH_RESPONSE)
|
||||
mock_reply = Mock() # The response that the request should give, containing an error about it failing to authenticate.
|
||||
mock_reply.error = Mock(return_value = QNetworkReply.NetworkError.AuthenticationRequiredError) # The reply is 403: Authentication required, meaning the server responded with a "Can't do that, Dave".
|
||||
http_mock = Mock()
|
||||
http_mock.get = lambda url, headers_dict, callback, error_callback: callback(mock_reply)
|
||||
http_mock.post = lambda url, data, headers_dict, callback, error_callback: callback(mock_reply)
|
||||
|
||||
with patch("UM.TaskManagement.HttpRequestManager.HttpRequestManager.readJSON", Mock(return_value = {"error_description": "Mock a failed request!"})):
|
||||
with patch("UM.TaskManagement.HttpRequestManager.HttpRequestManager.getInstance", MagicMock(return_value = http_mock)):
|
||||
authorization_service._storeAuthData(SUCCESSFUL_AUTH_RESPONSE)
|
||||
authorization_service.onAuthStateChanged.emit = MagicMock()
|
||||
with patch("cura.OAuth2.AuthorizationHelpers.AuthorizationHelpers.getAccessTokenUsingRefreshToken", mock_refresh):
|
||||
authorization_service.refreshAccessToken()
|
||||
assert authorization_service.onAuthStateChanged.emit.called_with(False)
|
||||
|
||||
def test_refreshAccesTokenWithoutData():
|
||||
authorization_service = AuthorizationService(OAUTH_SETTINGS, Preferences())
|
||||
@ -124,14 +199,6 @@ def test_refreshAccesTokenWithoutData():
|
||||
authorization_service.refreshAccessToken()
|
||||
authorization_service.onAuthStateChanged.emit.assert_not_called()
|
||||
|
||||
|
||||
def test_userProfileException():
|
||||
authorization_service = AuthorizationService(OAUTH_SETTINGS, Preferences())
|
||||
authorization_service.initialize()
|
||||
authorization_service._parseJWT = MagicMock(side_effect=requests.exceptions.ConnectionError)
|
||||
assert authorization_service.getUserProfile() is None
|
||||
|
||||
|
||||
def test_failedLogin() -> None:
|
||||
authorization_service = AuthorizationService(OAUTH_SETTINGS, Preferences())
|
||||
authorization_service.onAuthenticationError.emit = MagicMock()
|
||||
@ -151,8 +218,7 @@ def test_failedLogin() -> None:
|
||||
assert authorization_service.getUserProfile() is None
|
||||
assert authorization_service.getAccessToken() is None
|
||||
|
||||
|
||||
@patch.object(AuthorizationService, "getUserProfile", return_value=UserProfile())
|
||||
@patch.object(AuthorizationService, "getUserProfile")
|
||||
def test_storeAuthData(get_user_profile) -> None:
|
||||
preferences = Preferences()
|
||||
authorization_service = AuthorizationService(OAUTH_SETTINGS, preferences)
|
||||
@ -170,7 +236,6 @@ def test_storeAuthData(get_user_profile) -> None:
|
||||
second_auth_service.loadAuthDataFromPreferences()
|
||||
assert second_auth_service.getAccessToken() == SUCCESSFUL_AUTH_RESPONSE.access_token
|
||||
|
||||
|
||||
@patch.object(LocalAuthorizationServer, "stop")
|
||||
@patch.object(LocalAuthorizationServer, "start")
|
||||
@patch.object(QDesktopServices, "openUrl")
|
||||
@ -188,7 +253,6 @@ def test_localAuthServer(QDesktopServices_openUrl, start_auth_server, stop_auth_
|
||||
# Ensure that it stopped the server.
|
||||
assert stop_auth_server.call_count == 1
|
||||
|
||||
|
||||
def test_loginAndLogout() -> None:
|
||||
preferences = Preferences()
|
||||
authorization_service = AuthorizationService(OAUTH_SETTINGS, preferences)
|
||||
@ -196,8 +260,14 @@ def test_loginAndLogout() -> None:
|
||||
authorization_service.onAuthStateChanged.emit = MagicMock()
|
||||
authorization_service.initialize()
|
||||
|
||||
mock_reply = Mock() # The user profile that the service should respond with.
|
||||
mock_reply.error = Mock(return_value = QNetworkReply.NetworkError.NoError)
|
||||
http_mock = Mock()
|
||||
http_mock.get = lambda url, headers_dict, callback, error_callback: callback(mock_reply)
|
||||
http_mock.readJSON = Mock(return_value = {"data": {"user_id": "di_resu", "username": "Emanresu"}})
|
||||
|
||||
# Let the service think there was a successful response
|
||||
with patch.object(AuthorizationHelpers, "parseJWT", return_value=UserProfile()):
|
||||
with patch("UM.TaskManagement.HttpRequestManager.HttpRequestManager.getInstance", MagicMock(return_value = http_mock)):
|
||||
authorization_service._onAuthStateChanged(SUCCESSFUL_AUTH_RESPONSE)
|
||||
|
||||
# Ensure that the error signal was not triggered
|
||||
@ -205,7 +275,10 @@ def test_loginAndLogout() -> None:
|
||||
|
||||
# Since we said that it went right this time, validate that we got a signal.
|
||||
assert authorization_service.onAuthStateChanged.emit.call_count == 1
|
||||
assert authorization_service.getUserProfile() is not None
|
||||
with patch("UM.TaskManagement.HttpRequestManager.HttpRequestManager.getInstance", MagicMock(return_value = http_mock)):
|
||||
def callback(profile):
|
||||
assert profile is not None
|
||||
authorization_service.getUserProfile(callback)
|
||||
assert authorization_service.getAccessToken() == "beep"
|
||||
|
||||
# Check that we stored the authentication data, so next time the user won't have to log in again.
|
||||
@ -214,19 +287,22 @@ def test_loginAndLogout() -> None:
|
||||
# We're logged in now, also check if logging out works
|
||||
authorization_service.deleteAuthData()
|
||||
assert authorization_service.onAuthStateChanged.emit.call_count == 2
|
||||
assert authorization_service.getUserProfile() is None
|
||||
with patch("UM.TaskManagement.HttpRequestManager.HttpRequestManager.getInstance", MagicMock(return_value = http_mock)):
|
||||
def callback(profile):
|
||||
assert profile is None
|
||||
authorization_service.getUserProfile(callback)
|
||||
|
||||
# Ensure the data is gone after we logged out.
|
||||
assert preferences.getValue("test/auth_data") == "{}"
|
||||
|
||||
|
||||
def test_wrongServerResponses() -> None:
|
||||
authorization_service = AuthorizationService(OAUTH_SETTINGS, Preferences())
|
||||
authorization_service.initialize()
|
||||
with patch.object(AuthorizationHelpers, "parseJWT", return_value=UserProfile()):
|
||||
authorization_service._onAuthStateChanged(MALFORMED_AUTH_RESPONSE)
|
||||
assert authorization_service.getUserProfile() is None
|
||||
authorization_service._onAuthStateChanged(MALFORMED_AUTH_RESPONSE)
|
||||
|
||||
def callback(profile):
|
||||
assert profile is None
|
||||
authorization_service.getUserProfile(callback)
|
||||
|
||||
def test__generate_auth_url() -> None:
|
||||
preferences = Preferences()
|
||||
|
Loading…
x
Reference in New Issue
Block a user