diff --git a/teslajsonpy/connection.py b/teslajsonpy/connection.py index bcf7d727..539a09fa 100644 --- a/teslajsonpy/connection.py +++ b/teslajsonpy/connection.py @@ -51,67 +51,28 @@ def __init__( self.baseurl: Text = "https://owner-api.teslamotors.com" self.websocket_url: Text = "wss://streaming.vn.teslamotors.com/streaming" self.api: Text = "/api/1/" - self.oauth: Dict[Text, Text] = {} self.expiration: int = expiration self.access_token = access_token self.head = None self.refresh_token = refresh_token self.websession = websession + self.email = email + self.password = password self.token_refreshed = False self.code_verifier: Text = secrets.token_urlsafe(64) + self.code_challenge = str( + base64.urlsafe_b64encode( + hashlib.sha256(self.code_verifier.encode()).hexdigest().encode() + ), + "utf-8", + ) + self.code = authorization_token self.sso_oauth: Dict[Text, Text] = {} - self.generate_oauth(email, password, refresh_token, authorization_token) if self.access_token: self.__sethead(access_token=self.access_token, expiration=self.expiration) _LOGGER.debug("Connecting with existing access token") self.websocket = None - def generate_oauth( - self, - email: Text = None, - password: Text = None, - refresh_token: Text = None, - authorization_token=None, - ) -> None: - """Generate oauth header. - - Args - email (Text, optional): Tesla account email address. Defaults to None. - password (Text, optional): Password for account. Defaults to None. - refresh_token (Text, optional): Refresh token. Defaults to None. - authorization_token (Text, optional): Authorization token. Defaults to None. - - Raises - IncompleteCredentials - - Returns - None - - """ - refresh_token = refresh_token or self.refresh_token - self.oauth = {"client_id": self.client_id, "client_secret": self.client_secret} - if authorization_token: - self.oauth = {"client_id": "ownerapi"} - self.oauth["grant_type"] = "authorization_code" - self.oauth["code"] = authorization_token - self.oauth["code_verifier"] = self.code_verifier - self.oauth["redirect_uri"] = "https://auth.tesla.com/void/callback" - elif email and password: - self.oauth["grant_type"] = "password" - self.oauth["email"] = email - self.oauth["password"] = password - elif refresh_token: - self.oauth["grant_type"] = "refresh_token" - self.oauth["refresh_token"] = refresh_token - elif not refresh_token: - raise IncompleteCredentials( - "Missing oauth authentication details: refresh token." - ) - else: - raise IncompleteCredentials( - "Missing oauth authentication details: email and password." - ) - async def get(self, command): """Get data from API.""" return await self.post(command, "get", None) @@ -120,31 +81,40 @@ async def post(self, command, method="post", data=None): """Post data to API.""" now = calendar.timegm(datetime.datetime.now().timetuple()) if now > self.expiration: - _LOGGER.debug( - "Requesting new oauth token using %s", self.oauth["grant_type"] - ) - if not self.sso_oauth or ( - now > self.sso_oauth.get("expires_in", 0) - and not self.sso_oauth.get("refresh_token") + self.token_refreshed = False + auth = {} + _LOGGER.debug("Oauth expiration detected") + if (self.code or (self.email and self.password)) and ( + not self.sso_oauth + or ( + now > self.sso_oauth.get("expires_in", 0) + and not self.sso_oauth.get("refresh_token") + ) ): - await self.get_authorization_code() - auth = await self.get_sso_auth_token() - self.sso_oauth = { - "access_token": auth["access_token"], - "refresh_token": auth["refresh_token"], - "expires_in": auth["expires_in"] + now, - } + _LOGGER.debug("Getting sso auth code using credentials") + if self.email and self.password: + self.code = await self.get_authorization_code( + self.email, self.password + ) + auth = await self.get_sso_auth_token(self.code) elif self.sso_oauth.get("refresh_token") and now > self.sso_oauth.get( "expires_in", 0 ): + _LOGGER.debug("Refreshing sso auth code") auth = await self.refresh_access_token( refresh_token=self.sso_oauth.get("refresh_token") ) + if auth: self.sso_oauth = { "access_token": auth["access_token"], "refresh_token": auth["refresh_token"], "expires_in": auth["expires_in"] + now, } + _LOGGER.debug("Saving new auth info %s", self.sso_oauth) + else: + _LOGGER.debug("Unable to refresh sso oauth token") + self.sso_oauth = {} + raise IncompleteCredentials("Need oauth credentials") auth = await self.get_bearer_token( access_token=self.sso_oauth.get("access_token") ) @@ -152,15 +122,13 @@ async def post(self, command, method="post", data=None): access_token=auth["access_token"], expires_in=auth["expires_in"] ) self.refresh_token = auth["refresh_token"] - self.generate_oauth() self.token_refreshed = True + _LOGGER.debug("Successfully refreshed oauth") return await self.__open( f"{self.api}{command}", method=method, headers=self.head, data=data ) - def __sethead( - self, access_token: Text, expires_in: int = 1800, expiration: int = 0 - ): + def __sethead(self, access_token: Text, expires_in: int = 30, expiration: int = 0): """Set HTTP header.""" self.access_token = access_token if expiration > 0: @@ -373,19 +341,17 @@ async def _process_messages() -> None: # } # ) - async def get_authorization_code(self) -> Text: + async def get_authorization_code(self, email, password) -> Text: """Get authorization code from the oauth3 login method.""" # https://tesla-api.timdorr.com/api-basics/authentication#step-2-obtain-an-authorization-code - code_challenge = str( - base64.urlsafe_b64encode( - hashlib.sha256(self.code_verifier.encode()).hexdigest().encode() - ), - "utf-8", - ) + if not (email and password): + _LOGGER.debug("No email or password for login; unable to login.") + return + state = secrets.token_urlsafe(64) query = { "client_id": "ownerapi", - "code_challenge": code_challenge, + "code_challenge": self.code_challenge, "code_challenge_method": "S256", "redirect_uri": "https://auth.tesla.com/void/callback", "response_type": "code", @@ -394,32 +360,36 @@ async def get_authorization_code(self) -> Text: } url = yarl.URL("https://auth.tesla.com/oauth2/v3/authorize") url = url.update_query(query) + _LOGGER.debug("Getting sso auth token from %s", url) resp = await self.websession.get(url) html = await resp.text() soup: BeautifulSoup = BeautifulSoup(html, "html.parser") data = get_inputs(soup) - data["identity"] = self.oauth["email"] - data["credential"] = self.oauth["password"] + data["identity"] = self.email + data["credential"] = self.password resp = await self.websession.post(url, data=data) _process_resp(resp) code_url = URL(resp.history[-1].url) - self.generate_oauth(authorization_token=code_url.query.get("code")) return code_url.query.get("code") - async def get_sso_auth_token(self): + async def get_sso_auth_token(self, code): """Get sso auth token.""" # https://tesla-api.timdorr.com/api-basics/authentication#step-2-obtain-an-authorization-code - _LOGGER.debug("Requesting new oauth token using %s", self.oauth["grant_type"]) - if self.oauth["grant_type"] == "authorization_code": - auth = await self.__open( - "/oauth2/v3/token", - "post", - data=self.oauth, - baseurl="https://auth.tesla.com", - ) - else: - auth = await self.__open("/oauth/token", "post", data=self.oauth) - return auth + _LOGGER.debug("Requesting new sso oauth token using sso auth code") + if not code: + _LOGGER.debug("No authorization code provided") + return + oauth = { + "client_id": "ownerapi", + "grant_type": "authorization_code", + "code": code, + "code_verifier": self.code_verifier, + "redirect_uri": "https://auth.tesla.com/void/callback", + } + auth = await self.websession.post( + "https://auth.tesla.com/oauth2/v3/token", data=oauth, + ) + return await auth.json() async def refresh_access_token(self, refresh_token): """Refresh access token from sso.""" @@ -434,10 +404,10 @@ async def refresh_access_token(self, refresh_token): "refresh_token": refresh_token, "scope": "openid email offline_access", } - auth = await self.__open( - "/oauth2/v3/token", "post", data=oauth, baseurl="https://auth.tesla.com", + auth = await self.websession.post( + "https://auth.tesla.com/oauth2/v3/token", data=oauth, ) - return auth + return await auth.json() async def get_bearer_token(self, access_token): """Get bearer token. This is used by the owners API.""" @@ -453,8 +423,10 @@ async def get_bearer_token(self, access_token): head = { "Authorization": f"Bearer {access_token}", } - auth = await self.__open("/oauth/token", "post", headers=head, data=oauth) - return auth + auth = await self.websession.post( + "https://owner-api.teslamotors.com/oauth/token", headers=head, data=oauth + ) + return await auth.json() def get_inputs(soup: BeautifulSoup, searchfield=None) -> Dict[str, str]: diff --git a/tests/tesla_mock.py b/tests/tesla_mock.py index f5835914..533a1e16 100644 --- a/tests/tesla_mock.py +++ b/tests/tesla_mock.py @@ -41,9 +41,6 @@ def __init__(self, monkeypatch) -> None: Controller, "get_last_update_time", self.mock_get_last_update_time ) self._monkeypatch.setattr(Controller, "update", self.mock_update) - self._monkeypatch.setattr( - Connection, "generate_oauth", self.mock_generate_oauth - ) def mock_connect(self, *args, **kwargs): # pylint: disable=unused-argument @@ -95,11 +92,6 @@ def mock_update(self, *args, **kwargs): """ Mock controller's update method.""" return self.controller_update() - def mock_generate_oauth(self, *args, **kwargs): - # pylint: disable=unused-argument - """ Mock connection's generate_oauth method.""" - return self.connection_generate_oauth() - @staticmethod def controller_connect(): """ Monkeypatch for controller.connect()."""