Skip to content

Commit

Permalink
Support for external httpx client
Browse files Browse the repository at this point in the history
  • Loading branch information
CJNE committed Oct 16, 2024
1 parent 5bcad19 commit 30018da
Show file tree
Hide file tree
Showing 2 changed files with 68 additions and 53 deletions.
2 changes: 1 addition & 1 deletion pymyenergi/VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
0.2.1
0.2.2
119 changes: 67 additions & 52 deletions pymyenergi/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
_CLIENT_ID = "2fup0dhufn5vurmprjkj599041"



class Connection:
"""Connection to myenergi API."""

Expand All @@ -30,11 +29,13 @@ def __init__(
app_password: Text = None,
app_email: Text = None,
timeout: int = 20,
asyncClient=httpx.AsyncClient(),
) -> None:
"""Initialize connection object."""
self.timeout = timeout
self.director_url = "https://director.myenergi.net"
self.base_url = None
self.asyncClient = asyncClient
self.oauth_base_url = "https://myaccount.myenergi.com"
self.username = username
self.password = password
Expand Down Expand Up @@ -66,8 +67,10 @@ async def discoverLocations(self):
if self.app_email and self.app_password:
locs = await self.get("/api/Location", oauth=True)
# check if guest location - use the first location by default
if locs["content"][0]["isGuestLocation"] == True:
self.invitation_id = locs["content"][0]["invitationData"]["invitationId"]
if locs["content"][0]["isGuestLocation"]:
self.invitation_id = locs["content"][0]["invitationData"][
"invitationId"
]

def checkAndUpdateToken(self):
# check if we have oauth credentials
Expand All @@ -81,66 +84,78 @@ async def send(self, method, url, json=None, oauth=False):
if oauth:
# check if we have oauth credentials
if self.app_email and self.app_password:
async with httpx.AsyncClient(
headers=self.oauth_headers, timeout=self.timeout
) as httpclient:
theUrl = self.oauth_base_url + url
# if we have an invitiation id, we need to add that to the query
if self.invitation_id != "":
if "?" in theUrl:
theUrl = theUrl + "&invitationId=" + self.invitation_id
else:
theUrl = theUrl + "?invitationId=" + self.invitation_id
try:
_LOGGER.debug(f"{method} {url} {theUrl}")
response = await httpclient.request(method, theUrl, json=json)
except httpx.ReadTimeout:
raise TimeoutException()
else:
_LOGGER.debug(f"{method} status {response.status_code}")
if response.status_code == 200:
return response.json()
elif response.status_code == 401:
raise WrongCredentials()
raise MyenergiException(response.status_code)
else:
_LOGGER.error("Trying to use OAuth without app credentials")

# Use Digest Auth for director.myenergi.net and s18.myenergi.net
else:
# If base URL has not been set, make a request to director to fetch it
async with httpx.AsyncClient(
auth=self.auth, headers=self.headers, timeout=self.timeout
) as httpclient:
if self.base_url is None or self.do_query_asn:
_LOGGER.debug("Get Myenergi base url from director")
try:
directorUrl = self.director_url + "/cgi-jstatus-E"
response = await httpclient.get(directorUrl)
except Exception:
_LOGGER.error("Myenergi server request problem")
_LOGGER.debug(sys.exc_info()[0])
theUrl = self.oauth_base_url + url
# if we have an invitiation id, we need to add that to the query
if self.invitation_id != "":
if "?" in theUrl:
theUrl = theUrl + "&invitationId=" + self.invitation_id
else:
self.do_query_asn = False
self._checkMyenergiServerURL(response.headers)
theUrl = self.base_url + url
theUrl = theUrl + "?invitationId=" + self.invitation_id
try:
_LOGGER.debug(f"{method} {url} {theUrl}")
response = await httpclient.request(method, theUrl, json=json)
response = await self.asyncClient.request(
method,
theUrl,
json=json,
headers=self.oauth_headers,
timeout=self.timeout,
)
except httpx.ReadTimeout:
# Make sure to query for ASN next request, might be a server problem
self.do_query_asn = True
raise TimeoutException()
else:
_LOGGER.debug(f"GET status {response.status_code}")
self._checkMyenergiServerURL(response.headers)
_LOGGER.debug(f"{method} status {response.status_code}")
if response.status_code == 200:
return response.json()
elif response.status_code == 401:
raise WrongCredentials()
# Make sure to query for ASN next request, might be a server problem
self.do_query_asn = True
raise MyenergiException(response.status_code)
else:
_LOGGER.error("Trying to use OAuth without app credentials")

# Use Digest Auth for director.myenergi.net and s18.myenergi.net
else:
# If base URL has not been set, make a request to director to fetch it
if self.base_url is None or self.do_query_asn:
_LOGGER.debug("Get Myenergi base url from director")
try:
directorUrl = self.director_url + "/cgi-jstatus-E"
response = await self.asyncClient.get(
directorUrl,
auth=self.auth,
headers=self.headers,
timeout=self.timeout,
)
except Exception:
_LOGGER.error("Myenergi server request problem")
_LOGGER.debug(sys.exc_info()[0])
else:
self.do_query_asn = False
self._checkMyenergiServerURL(response.headers)
theUrl = self.base_url + url
try:
_LOGGER.debug(f"{method} {url} {theUrl}")
response = await self.asyncClient.request(
method,
theUrl,
auth=self.auth,
headers=self.headers,
timeout=self.timeout,
json=json,
)
except httpx.ReadTimeout:
# Make sure to query for ASN next request, might be a server problem
self.do_query_asn = True
raise TimeoutException()
else:
_LOGGER.debug(f"GET status {response.status_code}")
self._checkMyenergiServerURL(response.headers)
if response.status_code == 200:
return response.json()
elif response.status_code == 401:
raise WrongCredentials()
# Make sure to query for ASN next request, might be a server problem
self.do_query_asn = True
raise MyenergiException(response.status_code)

async def get(self, url, data=None, oauth=False):
return await self.send("GET", url, data, oauth)
Expand Down

0 comments on commit 30018da

Please sign in to comment.