diff --git a/Pipfile b/Pipfile index 91aebdd8..db8e91fe 100644 --- a/Pipfile +++ b/Pipfile @@ -20,6 +20,7 @@ black = "*" [packages] aiohttp = "*" backoff = "*" +bs4 = "*" wrapt = "*" [pipenv] diff --git a/teslajsonpy/connection.py b/teslajsonpy/connection.py index ff24cff5..b9c64bd0 100644 --- a/teslajsonpy/connection.py +++ b/teslajsonpy/connection.py @@ -6,12 +6,17 @@ https://github.com/zabuldon/teslajsonpy """ import asyncio +import base64 import calendar import datetime +import hashlib import json import logging +import secrets import time from typing import Dict, Text +from bs4 import BeautifulSoup +import yarl import aiohttp from yarl import URL @@ -32,6 +37,7 @@ def __init__( password: Text = None, access_token: Text = None, refresh_token: Text = None, + authorization_token=None, expiration: int = 0, ) -> None: """Initialize connection object.""" @@ -52,14 +58,20 @@ def __init__( self.refresh_token = refresh_token self.websession = websession self.token_refreshed = False - self.generate_oauth(email, password, refresh_token) + self.code_verifier: Text = secrets.token_urlsafe(64) + self.sso_oauth: Dict[Text, Text] = {} + self.generate_oauth(email, password, refresh_token, authorization_token) if self.access_token: self.__sethead(access_token=self.access_token, expiration=self.expiration) _LOGGER.debug("Connecting with existing access token") self.websocket = None def generate_oauth( - self, email: Text = None, password: Text = None, refresh_token: Text = None + self, + email: Text = None, + password: Text = None, + refresh_token: Text = None, + authorization_token=None, ) -> None: """Generate oauth header. @@ -67,6 +79,7 @@ def generate_oauth( email (Text, optional): Tesla account email address. Defaults to None. password (Text, optional): Password for account. Defaults to None. refresh_token (Text, optional): Refresh token. Defaults to None. + authorization_token (Text, optional): Authorization token. Defaults to None. Raises IncompleteCredentials @@ -77,7 +90,13 @@ def generate_oauth( """ refresh_token = refresh_token or self.refresh_token self.oauth = {"client_id": self.client_id, "client_secret": self.client_secret} - if email and password: + if authorization_token: + self.oauth = {"client_id": "ownerapi"} + self.oauth["grant_type"] = "authorization_code" + self.oauth["code"] = authorization_token + self.oauth["code_verifier"] = self.code_verifier + self.oauth["redirect_uri"] = "https://auth.tesla.com/void/callback" + elif email and password: self.oauth["grant_type"] = "password" self.oauth["email"] = email self.oauth["password"] = password @@ -104,7 +123,31 @@ async def post(self, command, method="post", data=None): _LOGGER.debug( "Requesting new oauth token using %s", self.oauth["grant_type"] ) - auth = await self.__open("/oauth/token", "post", data=self.oauth) + if not self.sso_oauth or ( + now > self.sso_oauth.get("expires_in", 0) + and not self.sso_oauth.get("refresh_token") + ): + await self.get_authorization_code() + auth = await self.get_sso_auth_token() + self.sso_oauth = { + "access_token": auth["access_token"], + "refresh_token": auth["refresh_token"], + "expires_in": auth["expires_in"] + now, + } + elif self.sso_oauth.get("refresh_token") and now > self.sso_oauth.get( + "expires_in", 0 + ): + auth = await self.refresh_access_token( + refresh_token=self.sso_oauth.get("refresh_token") + ) + self.sso_oauth = { + "access_token": auth["access_token"], + "refresh_token": auth["refresh_token"], + "expires_in": auth["expires_in"] + now, + } + auth = await self.get_bearer_token( + access_token=self.sso_oauth.get("access_token") + ) self.__sethead( access_token=auth["access_token"], expires_in=auth["expires_in"] ) @@ -135,11 +178,13 @@ async def __open( url: Text, method: Text = "get", headers=None, + cookies=None, data=None, baseurl: Text = "", ) -> None: """Open url.""" headers = headers or {} + cookies = cookies or {} if not baseurl: baseurl = self.baseurl url: URL = URL(baseurl + url) @@ -148,7 +193,7 @@ async def __open( try: resp = await getattr(self.websession, method)( - url, headers=headers, data=data + url, data=data, headers=headers, cookies=cookies ) data = await resp.json() _LOGGER.debug("%s: %s", resp.status, json.dumps(data)) @@ -327,3 +372,124 @@ async def _process_messages() -> None: # "tag": f"{vehicle_id}", # } # ) + + async def get_authorization_code(self) -> Text: + """Get authorization code from the oauth3 login method.""" + # https://tesla-api.timdorr.com/api-basics/authentication#step-2-obtain-an-authorization-code + code_challenge = str( + base64.urlsafe_b64encode( + hashlib.sha256(self.code_verifier.encode()).hexdigest().encode() + ), + "utf-8", + ) + state = secrets.token_urlsafe(64) + query = { + "client_id": "ownerapi", + "code_challenge": code_challenge, + "code_challenge_method": "S256", + "redirect_uri": "https://auth.tesla.com/void/callback", + "response_type": "code", + "scope": "openid email offline_access", + "state": state, + } + url = yarl.URL("https://auth.tesla.com/oauth2/v3/authorize") + url = url.update_query(query) + resp = await self.websession.get(url) + html = await resp.text() + soup: BeautifulSoup = BeautifulSoup(html, "html.parser") + data = get_inputs(soup) + data["identity"] = self.oauth["email"] + data["credential"] = self.oauth["password"] + resp = await self.websession.post(url, data=data) + _process_resp(resp) + code_url = URL(resp.history[-1].url) + self.generate_oauth(authorization_token=code_url.query.get("code")) + return code_url.query.get("code") + + async def get_sso_auth_token(self): + """Get sso auth token.""" + # https://tesla-api.timdorr.com/api-basics/authentication#step-2-obtain-an-authorization-code + _LOGGER.debug("Requesting new oauth token using %s", self.oauth["grant_type"]) + if self.oauth["grant_type"] == "authorization_code": + auth = await self.__open( + "/oauth2/v3/token", + "post", + data=self.oauth, + baseurl="https://auth.tesla.com", + ) + else: + auth = await self.__open("/oauth/token", "post", data=self.oauth) + return auth + + async def refresh_access_token(self, refresh_token): + """Refresh access token from sso.""" + # https://tesla-api.timdorr.com/api-basics/authentication#refreshing-an-access-token + if not refresh_token: + _LOGGER.debug("Missing refresh token") + return + _LOGGER.debug("Refreshing access token with refresh_token") + oauth = { + "client_id": "ownerapi", + "grant_type": "refresh_token", + "refresh_token": refresh_token, + "scope": "openid email offline_access", + } + auth = await self.__open( + "/oauth2/v3/token", "post", data=oauth, baseurl="https://auth.tesla.com", + ) + return auth + + async def get_bearer_token(self, access_token): + """Get bearer token. This is used by the owners API.""" + # https://tesla-api.timdorr.com/api-basics/authentication#step-4-exchange-bearer-token-for-access-token + if not access_token: + _LOGGER.debug("Missing access token") + return + _LOGGER.debug("Exchanging bearer token with access token:") + oauth = { + "client_id": self.client_id, + "grant_type": "urn:ietf:params:oauth:grant-type:jwt-bearer", + } + head = { + "Authorization": f"Bearer {access_token}", + } + auth = await self.__open("/oauth/token", "post", headers=head, data=oauth) + return auth + + +def get_inputs(soup: BeautifulSoup, searchfield=None) -> Dict[str, str]: + """Parse soup for form with searchfield.""" + searchfield = searchfield or {"id": "form"} + data = {} + form = soup.find("form", searchfield) + if not form: + form = soup.find("form") + for field in form.find_all("input"): + try: + data[field["name"]] = "" + if field["type"] and field["type"] == "hidden": + data[field["name"]] = field["value"] + except BaseException: # pylint: disable=broad-except + pass + return data + + +def _process_resp(resp) -> Text: + if resp.history: + for item in resp.history: + _LOGGER.debug("%s: redirected from\n%s", item.method, item.url) + url = str(resp.request_info.url) + method = resp.request_info.method + status = resp.status + reason = resp.reason + headers = resp.request_info.headers + _LOGGER.debug( + "%s: \n%s with\n%s\n returned %s:%s with response %s", + method, + url, + headers, + status, + reason, + resp.headers, + ) + return url