Skip to content

Commit

Permalink
Merge pull request #132 from alandtse/ci_fix
Browse files Browse the repository at this point in the history
Signed-off-by: Alan Tse <alandtse@gmail.com>
  • Loading branch information
alandtse committed Jan 31, 2021
2 parents e37d232 + fba3f92 commit ff99a2f
Show file tree
Hide file tree
Showing 2 changed files with 65 additions and 101 deletions.
158 changes: 65 additions & 93 deletions teslajsonpy/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,67 +51,28 @@ def __init__(
self.baseurl: Text = "https://owner-api.teslamotors.com"
self.websocket_url: Text = "wss://streaming.vn.teslamotors.com/streaming"
self.api: Text = "/api/1/"
self.oauth: Dict[Text, Text] = {}
self.expiration: int = expiration
self.access_token = access_token
self.head = None
self.refresh_token = refresh_token
self.websession = websession
self.email = email
self.password = password
self.token_refreshed = False
self.code_verifier: Text = secrets.token_urlsafe(64)
self.code_challenge = str(
base64.urlsafe_b64encode(
hashlib.sha256(self.code_verifier.encode()).hexdigest().encode()
),
"utf-8",
)
self.code = authorization_token
self.sso_oauth: Dict[Text, Text] = {}
self.generate_oauth(email, password, refresh_token, authorization_token)
if self.access_token:
self.__sethead(access_token=self.access_token, expiration=self.expiration)
_LOGGER.debug("Connecting with existing access token")
self.websocket = None

def generate_oauth(
self,
email: Text = None,
password: Text = None,
refresh_token: Text = None,
authorization_token=None,
) -> None:
"""Generate oauth header.
Args
email (Text, optional): Tesla account email address. Defaults to None.
password (Text, optional): Password for account. Defaults to None.
refresh_token (Text, optional): Refresh token. Defaults to None.
authorization_token (Text, optional): Authorization token. Defaults to None.
Raises
IncompleteCredentials
Returns
None
"""
refresh_token = refresh_token or self.refresh_token
self.oauth = {"client_id": self.client_id, "client_secret": self.client_secret}
if authorization_token:
self.oauth = {"client_id": "ownerapi"}
self.oauth["grant_type"] = "authorization_code"
self.oauth["code"] = authorization_token
self.oauth["code_verifier"] = self.code_verifier
self.oauth["redirect_uri"] = "https://auth.tesla.com/void/callback"
elif email and password:
self.oauth["grant_type"] = "password"
self.oauth["email"] = email
self.oauth["password"] = password
elif refresh_token:
self.oauth["grant_type"] = "refresh_token"
self.oauth["refresh_token"] = refresh_token
elif not refresh_token:
raise IncompleteCredentials(
"Missing oauth authentication details: refresh token."
)
else:
raise IncompleteCredentials(
"Missing oauth authentication details: email and password."
)

async def get(self, command):
"""Get data from API."""
return await self.post(command, "get", None)
Expand All @@ -120,47 +81,54 @@ async def post(self, command, method="post", data=None):
"""Post data to API."""
now = calendar.timegm(datetime.datetime.now().timetuple())
if now > self.expiration:
_LOGGER.debug(
"Requesting new oauth token using %s", self.oauth["grant_type"]
)
if not self.sso_oauth or (
now > self.sso_oauth.get("expires_in", 0)
and not self.sso_oauth.get("refresh_token")
self.token_refreshed = False
auth = {}
_LOGGER.debug("Oauth expiration detected")
if (self.code or (self.email and self.password)) and (
not self.sso_oauth
or (
now > self.sso_oauth.get("expires_in", 0)
and not self.sso_oauth.get("refresh_token")
)
):
await self.get_authorization_code()
auth = await self.get_sso_auth_token()
self.sso_oauth = {
"access_token": auth["access_token"],
"refresh_token": auth["refresh_token"],
"expires_in": auth["expires_in"] + now,
}
_LOGGER.debug("Getting sso auth code using credentials")
if self.email and self.password:
self.code = await self.get_authorization_code(
self.email, self.password
)
auth = await self.get_sso_auth_token(self.code)
elif self.sso_oauth.get("refresh_token") and now > self.sso_oauth.get(
"expires_in", 0
):
_LOGGER.debug("Refreshing sso auth code")
auth = await self.refresh_access_token(
refresh_token=self.sso_oauth.get("refresh_token")
)
if auth:
self.sso_oauth = {
"access_token": auth["access_token"],
"refresh_token": auth["refresh_token"],
"expires_in": auth["expires_in"] + now,
}
_LOGGER.debug("Saving new auth info %s", self.sso_oauth)
else:
_LOGGER.debug("Unable to refresh sso oauth token")
self.sso_oauth = {}
raise IncompleteCredentials("Need oauth credentials")
auth = await self.get_bearer_token(
access_token=self.sso_oauth.get("access_token")
)
self.__sethead(
access_token=auth["access_token"], expires_in=auth["expires_in"]
)
self.refresh_token = auth["refresh_token"]
self.generate_oauth()
self.token_refreshed = True
_LOGGER.debug("Successfully refreshed oauth")
return await self.__open(
f"{self.api}{command}", method=method, headers=self.head, data=data
)

def __sethead(
self, access_token: Text, expires_in: int = 1800, expiration: int = 0
):
def __sethead(self, access_token: Text, expires_in: int = 30, expiration: int = 0):
"""Set HTTP header."""
self.access_token = access_token
if expiration > 0:
Expand Down Expand Up @@ -373,19 +341,17 @@ async def _process_messages() -> None:
# }
# )

async def get_authorization_code(self) -> Text:
async def get_authorization_code(self, email, password) -> Text:
"""Get authorization code from the oauth3 login method."""
# https://tesla-api.timdorr.com/api-basics/authentication#step-2-obtain-an-authorization-code
code_challenge = str(
base64.urlsafe_b64encode(
hashlib.sha256(self.code_verifier.encode()).hexdigest().encode()
),
"utf-8",
)
if not (email and password):
_LOGGER.debug("No email or password for login; unable to login.")
return

state = secrets.token_urlsafe(64)
query = {
"client_id": "ownerapi",
"code_challenge": code_challenge,
"code_challenge": self.code_challenge,
"code_challenge_method": "S256",
"redirect_uri": "https://auth.tesla.com/void/callback",
"response_type": "code",
Expand All @@ -394,32 +360,36 @@ async def get_authorization_code(self) -> Text:
}
url = yarl.URL("https://auth.tesla.com/oauth2/v3/authorize")
url = url.update_query(query)
_LOGGER.debug("Getting sso auth token from %s", url)
resp = await self.websession.get(url)
html = await resp.text()
soup: BeautifulSoup = BeautifulSoup(html, "html.parser")
data = get_inputs(soup)
data["identity"] = self.oauth["email"]
data["credential"] = self.oauth["password"]
data["identity"] = self.email
data["credential"] = self.password
resp = await self.websession.post(url, data=data)
_process_resp(resp)
code_url = URL(resp.history[-1].url)
self.generate_oauth(authorization_token=code_url.query.get("code"))
return code_url.query.get("code")

async def get_sso_auth_token(self):
async def get_sso_auth_token(self, code):
"""Get sso auth token."""
# https://tesla-api.timdorr.com/api-basics/authentication#step-2-obtain-an-authorization-code
_LOGGER.debug("Requesting new oauth token using %s", self.oauth["grant_type"])
if self.oauth["grant_type"] == "authorization_code":
auth = await self.__open(
"/oauth2/v3/token",
"post",
data=self.oauth,
baseurl="https://auth.tesla.com",
)
else:
auth = await self.__open("/oauth/token", "post", data=self.oauth)
return auth
_LOGGER.debug("Requesting new sso oauth token using sso auth code")
if not code:
_LOGGER.debug("No authorization code provided")
return
oauth = {
"client_id": "ownerapi",
"grant_type": "authorization_code",
"code": code,
"code_verifier": self.code_verifier,
"redirect_uri": "https://auth.tesla.com/void/callback",
}
auth = await self.websession.post(
"https://auth.tesla.com/oauth2/v3/token", data=oauth,
)
return await auth.json()

async def refresh_access_token(self, refresh_token):
"""Refresh access token from sso."""
Expand All @@ -434,10 +404,10 @@ async def refresh_access_token(self, refresh_token):
"refresh_token": refresh_token,
"scope": "openid email offline_access",
}
auth = await self.__open(
"/oauth2/v3/token", "post", data=oauth, baseurl="https://auth.tesla.com",
auth = await self.websession.post(
"https://auth.tesla.com/oauth2/v3/token", data=oauth,
)
return auth
return await auth.json()

async def get_bearer_token(self, access_token):
"""Get bearer token. This is used by the owners API."""
Expand All @@ -453,8 +423,10 @@ async def get_bearer_token(self, access_token):
head = {
"Authorization": f"Bearer {access_token}",
}
auth = await self.__open("/oauth/token", "post", headers=head, data=oauth)
return auth
auth = await self.websession.post(
"https://owner-api.teslamotors.com/oauth/token", headers=head, data=oauth
)
return await auth.json()


def get_inputs(soup: BeautifulSoup, searchfield=None) -> Dict[str, str]:
Expand Down
8 changes: 0 additions & 8 deletions tests/tesla_mock.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,6 @@ def __init__(self, monkeypatch) -> None:
Controller, "get_last_update_time", self.mock_get_last_update_time
)
self._monkeypatch.setattr(Controller, "update", self.mock_update)
self._monkeypatch.setattr(
Connection, "generate_oauth", self.mock_generate_oauth
)

def mock_connect(self, *args, **kwargs):
# pylint: disable=unused-argument
Expand Down Expand Up @@ -95,11 +92,6 @@ def mock_update(self, *args, **kwargs):
""" Mock controller's update method."""
return self.controller_update()

def mock_generate_oauth(self, *args, **kwargs):
# pylint: disable=unused-argument
""" Mock connection's generate_oauth method."""
return self.connection_generate_oauth()

@staticmethod
def controller_connect():
""" Monkeypatch for controller.connect()."""
Expand Down

0 comments on commit ff99a2f

Please sign in to comment.