From 88e3d9c518dabb0c939249142e439b73c41c07d8 Mon Sep 17 00:00:00 2001 From: Roger Gammans Date: Sun, 6 Mar 2022 21:47:10 +0000 Subject: [PATCH 1/7] Add PKCE Authentication support. --- tests/auth.py | 183 +++++++++++++++++++++++++++++++++++++++++++++++++- xero/auth.py | 114 ++++++++++++++++++++++++++++++- 2 files changed, 295 insertions(+), 2 deletions(-) diff --git a/tests/auth.py b/tests/auth.py index afe45ec..1660441 100644 --- a/tests/auth.py +++ b/tests/auth.py @@ -5,12 +5,16 @@ from mock import Mock, patch from six.moves.urllib.parse import parse_qs, urlparse +import requests + from xero.api import Xero from xero.auth import ( OAuth2Credentials, PartnerCredentials, PrivateCredentials, PublicCredentials, + OAuth2PKCECredentials, + PKCEAuthReceiver ) from xero.constants import XERO_OAUTH2_AUTHORIZE_URL from xero.exceptions import ( @@ -25,7 +29,7 @@ class PublicCredentialsTest(unittest.TestCase): @patch("requests.post") def test_initial_constructor(self, r_post): - "Initial construction causes a requst to get a request token" + "Initial construction causes a request to get a request token" r_post.return_value = Mock( status_code=200, text="oauth_token=token&oauth_token_secret=token_secret" ) @@ -570,3 +574,180 @@ def test_set_default_tenant_raises_exception(self): ) with self.assertRaises(XeroException): credentials.set_default_tenant() + + +class PKCERequestHandlerDummy: + def __init__(self,): + self.error = None + self.is_shutdown = False + self.success = False + + def send_error_page(self, error): + self.error = error + + def send_access_ok(self, ): + self.success = True + + def shutdown(self,): + self.is_shutdown = True + +class PKCECredentialsTest(unittest.TestCase): + # Mostly the same in principle as the Oauth2 ones, + # but oine include one where behavior is difeereny + callback_uri = "http://localhost:8123/callback" + + def setUp(self): + super().setUp() + # Create an expired token to be used by tests + self.expired_token = { + "access_token": "1234567890", + "expires_in": 1800, + "token_type": "Bearer", + "refresh_token": "0987654321", + # 'expires_at': datetime.utcnow().timestamp()} + "expires_at": time.time(), + } + + def test_verification_using_bad_auth_uri(self,): + credentials = OAuth2PKCECredentials( + "client_id", "client_secret", auth_state="test_state" + ) + bad_auth_params = {'error': 'access_denied', 'state': credentials.auth_state} + + rhandler = PKCERequestHandlerDummy() + credentials.verify_url(bad_auth_params, rhandler) + self.assertEqual(rhandler.error, 'access_denied') + self.assertTrue(rhandler.is_shutdown, ) + + def test_verification_using_bad_state(self,): + credentials = OAuth2PKCECredentials( + "client_id", "client_secret", auth_state="test_state" + ) + bad_auth_params = {'code': '213456789', 'state': "wrong_state"} + rhandler = PKCERequestHandlerDummy() + credentials.verify_url(bad_auth_params, rhandler) + self.assertEqual(rhandler.error, 'State Mismatch') + self.assertTrue(rhandler.is_shutdown, ) + + @patch("requests_oauthlib.OAuth2Session.request") + def test_verification_success(self, r_request): + credentials = OAuth2PKCECredentials( + "client_id", "client_secret", auth_state="test_state", + port=8123, + ) + params = { + 'code': ['0123456789'], + 'scope': credentials.scope, + 'state': [ credentials.auth_state ] + } + now = datetime.now().timestamp() + with patch.object(requests,'post', return_value=Mock( + status_code=200, + request=Mock(headers={}, body=""), + headers={}, + json = lambda :{ + "access_token":"1234567890", + "expires_at": now + 1800, + "token_type":"Bearer", + "refresh_token":"0987654321"} + )) as r_request: + rhandler = PKCERequestHandlerDummy() + credentials.verify_url(params, rhandler) + self.assertIs(rhandler.error, None) + self.assertTrue(r_request.called) + self.assertTrue(credentials.token) + self.assertTrue(credentials.oauth) + self.assertFalse(credentials.expired()) + + # Finally test the state + self.assertEqual( + credentials.state, + { + "client_id": credentials.client_id, + "client_secret": credentials.client_secret, + "auth_state": credentials.auth_state, + "scope": credentials.scope, + "user_agent": credentials.user_agent, + "token": credentials.token, + "callback_uri": self.callback_uri + }, + ) + + def test_verification_failure(self,): + credentials = OAuth2PKCECredentials( + "client_id", "client_secret", auth_state="test_state" + ) + params = { + 'code': ['0123456789'], + 'scope': credentials.scope, + 'state': [ credentials.auth_state ] + } + now = datetime.now().timestamp() + with patch.object(requests,'post', return_value=Mock( + status_code=400, + request=Mock(headers={}, body=""), + headers={}, + json = lambda :{"error":"invalid_grant"} + )): + rhandler = PKCERequestHandlerDummy() + credentials.verify_url(params, rhandler) + + self.assertIsInstance(credentials.error, XeroAccessDenied) + + def test_logon_opens_a_webbrowser(self,): + credentials = OAuth2PKCECredentials( + "client_id", "client_secret", auth_state="test_state", + request_handler= PKCERequestHandlerDummy, + port = 8123 + ) + server = Mock() + with patch('http.server.HTTPServer', return_value=server) as hs, \ + patch('webbrowser.open') as wb: + credentials.logon() + + self.assertTrue(wb.called) + server.serve_forerver.has_been_called() + portdata = hs.call_args[0][0] + partial = hs.call_args[0][1] + self.assertEqual(portdata, ('', 8123)) + self.assertEqual(partial.func, PKCERequestHandlerDummy ) + self.assertEqual(partial.args, (credentials, ) ) + + + +class PKCECallbackHandlerTests(unittest.TestCase): + def setUp(self,): + # PKCEAuthReciever's base class has a non-trivial + # handle method which is called by it's constructor. + # For tests we need to bypass this, so we can probe + # the do_GET behaviour - hence this subclass. + class TestRx(PKCEAuthReceiver): + def handle(self,*args): + pass + + import http.server + self.pkce_manager = Mock() + self.handler = TestRx(self.pkce_manager, + request=Mock(), + client_address=None, + server=Mock()) + + + def test_going_somewhere_other_than_callback_errors(self,): + self.handler.path="/foo/bar/baz.html" + self.handler.send_error_page=Mock() + self.handler.do_GET() + self.handler.send_error_page.assert_called_with("Unknown endpoint") + + def test_going_somewhere_other_than_callback_errors(self,): + self.handler.path="/callback?value=123&something=foo&different=bar" + self.handler.send_error_page=Mock() + self.handler.do_GET() + self.pkce_manager.verify_url.assert_called_with({ + 'value': ['123'], + 'something': ['foo'], + 'different': ['bar'], + }, + self.handler + ) + diff --git a/xero/auth.py b/xero/auth.py index e6f8a0a..6e502f9 100644 --- a/xero/auth.py +++ b/xero/auth.py @@ -2,11 +2,20 @@ import datetime import requests -from six.moves.urllib.parse import parse_qs, urlencode +import http.server +import threading +import webbrowser +import secrets +import hashlib +import base64 +from functools import partial + +from six.moves.urllib.parse import parse_qs, urlencode, urlparse from oauthlib.oauth1 import SIGNATURE_HMAC, SIGNATURE_RSA, SIGNATURE_TYPE_AUTH_HEADER from requests_oauthlib import OAuth1, OAuth2, OAuth2Session + from .constants import ( ACCESS_TOKEN_URL, AUTHORIZE_URL, @@ -701,3 +710,106 @@ def _handle_error_response(response): raise XeroNotAvailable(response) else: raise XeroExceptionUnknown(response) + + + +class PKCEAuthReceiver(http.server.BaseHTTPRequestHandler): + def __init__(self, credmanager , *args,**kwargs): + self.credmanager = credmanager + super().__init__(*args,**kwargs) + + @staticmethod + def close_server(s): + s.shutdown() + + def do_GET(self,*args): + request = urlparse(self.path) + params = parse_qs(request.query) + + if request.path == "/callback": + self.credmanager.verify_url(params,self) + else: + self.send_error_page("Unknown endpoint") + + def send_error_page(self,error): + print("Error:",error) + + def send_access_ok(self,): + print("LOGIN SUCCESS") + self.shutdown() + + def shutdown(self): + # Launch a thread to close our socket cleanly. + threading.Thread(target=self.__class__.close_server, args=(self.server,)).start() + + +class OAuth2PKCECredentials(OAuth2Credentials): + def __init__(self,*args,**kwargs): + self.port = kwargs.pop('port',8080) + self.runserver = kwargs.pop('handle_flow',True) + # Xero requires between 43 adn 128 bytes, it fails + # with invlaid grant if this is not long enough + self.verifier = kwargs.pop('verifier',secrets.token_urlsafe(64)) + self.handler_kls = kwargs.pop('request_handler', PKCEAuthReceiver ) + self.error = None + if isinstance(self.verifier,str): + self.verifier = self.verifier.encode('ascii') + kwargs.setdefault('callback_uri',f"http://localhost:{self.port}/callback") + super().__init__(*args,**kwargs) + + def logon(self,): + challenge = str(base64.urlsafe_b64encode(hashlib.sha256(self.verifier).digest())[:-1],'ascii') + url_base = super().generate_url() + webbrowser.open(url_base +f"&code_challenge={challenge}&code_challenge_method=S256") + self.wait_for_callback() + + def wait_for_callback(self,): + listen_to = ('',self.port) + s = http.server.HTTPServer(listen_to, partial(self.handler_kls, self) ) + s.serve_forever() + if self.error: + raise self.error + + def verify_url(self,params,reqhandler): + error = params.get('error',None) + if error: + self.handle_error(error,reqhandler) + return + + if params['state'][0] != self.state['auth_state']: + self.handle_error("State Mismatch",reqhandler) + return + + code = params.get('code',None) + if code: + try: + self.get_token(code[0]) + except Exception as e: + self.error = e + reqhandler.send_error_page(str(e)) + reqhandler.shutdown() + + reqhandler.send_access_ok() + + def get_token(self,code): + resp = requests.post( + url=XERO_OAUTH2_TOKEN_URL, + data={ + 'grant_type': 'authorization_code', + 'client_id': self.client_id, + 'redirect_uri': self.callback_uri, + 'code': code, + 'code_verifier': self.verifier + } + ) + respdata = resp.json() + error = respdata.get('error',None) + if error: + raise XeroAccessDenied(error) + + self._init_oauth(respdata) + + def handle_error(self,msg,handler): + self.error = RuntimeError(msg) + handler.send_error_page(msg) + handler.shutdown() From 3a1331cec24f3fa174c62a8805bc6c1b3176211d Mon Sep 17 00:00:00 2001 From: Roger Gammans Date: Mon, 7 Mar 2022 22:04:30 +0000 Subject: [PATCH 2/7] Remove unused member-var and argument from PKCE authenticator --- xero/auth.py | 1 - 1 file changed, 1 deletion(-) diff --git a/xero/auth.py b/xero/auth.py index 6e502f9..1811dfb 100644 --- a/xero/auth.py +++ b/xero/auth.py @@ -746,7 +746,6 @@ def shutdown(self): class OAuth2PKCECredentials(OAuth2Credentials): def __init__(self,*args,**kwargs): self.port = kwargs.pop('port',8080) - self.runserver = kwargs.pop('handle_flow',True) # Xero requires between 43 adn 128 bytes, it fails # with invlaid grant if this is not long enough self.verifier = kwargs.pop('verifier',secrets.token_urlsafe(64)) From a20d68cab4194ed732f8afb39e93ba2e24995a61 Mon Sep 17 00:00:00 2001 From: Roger Gammans Date: Mon, 7 Mar 2022 22:04:49 +0000 Subject: [PATCH 3/7] Add docs for PKCE --- README.md | 138 +++++++++++++++++++++++++++++++++++++++++++++++++++ xero/auth.py | 76 ++++++++++++++++++++++++++++ 2 files changed, 214 insertions(+) diff --git a/README.md b/README.md index c73f416..fc8ae9a 100644 --- a/README.md +++ b/README.md @@ -186,6 +186,144 @@ example at the end)*: ... ``` +### Using PKCE Credentials + +PKCE is an alternative flow for providing authentication via OAuth2, +it works largely the same as the standard OAuth2 mechanism, but unlike +the normal flow is designed to work with applications which cannot +keep private keys secure, such as desktop, mobile or single page apps where +such secrets could be extracted from the binary. (Or even the source, if it +is available). A client ID however is still required. + + +As elsewher OAuth2 tokens have a 30 minute expiry, But can be only swapped for a new +token, if a the `offline_access` scope is requested. + + Xero documentation on the PKCE flow can be found at + +[here](https://developer.xero.com/documentation/guides/oauth2/pkce-flow) +. +The procedure +for creating and authenticating credentials is as follows *(with a CLI +example at the end)*: + + 1) [Register your app](https://developer.xero.com/myapps) with Xero, using a + redirect URI which will be served by your app in order to complete the + authorisation e.g. `http://localhost:/callback/`. + You can chose any port, anc can pass it to the credentials object on construction, + allow with the the Client Id you are provded with. + + 2) Construct an `OAuth2Credentials` instance using the details from the first + step. + ```python + >>> from xero.auth import OAuth2Credentials + >>> + >>> credentials = OAuth2PKCECredentials(client_id, port=my_port) + ``` + If neccessary pass in a list of scopes to define the scopes required by your + app. E.g. if write access is required to transactions and payroll employees: + + ```python + >>> from xero.constants import XeroScopes + >>> + >>> my_scope = [XeroScopes.ACCOUNTING_TRANSACTIONS, + >>> XeroScopes.PAYROLL_EMPLOYEES] + >>> credentials = OAuth2Credentials(client_id, scope=my_scope + >>> port=my_port) + ``` + The default scopes are `['offline_access', 'accounting.transactions.read', + 'accounting.contacts.read']`. `offline_access` is required in order for + tokens to be refreshable. For more details on scopes see Xero's + [documentation]( https://developer.xero.com/documentation/oauth2/scopes). + + 3) Call `credentials.logon()` . This will open a browser window, an visit + a Xero authentication page. + + ```python + >>> credentials.logon() + ``` + + As well as opening that browser window the Authenticator will start + up a local webserver on the provided port, to collect the tokens + Xero returns. + + The default PCKEAuthReceiver class has no reponse + pages defined so the browser will show an error, on empty page + for all transactions. But the application is now authorised and + will continue. + If you wish you can override the `send_access_ok()` method, and the + `send_error_page()` method to create a more userfriendly experience. + + In either case once the callback url has been visited the local + server will shutdown. + + + 4) You can now continue as per the normal OAuth2 flow. + Now the credentials may be used to authorize a Xero session. As OAuth2 + allows authentication for multiple Xero Organisations, it is necessary to + set the tenant_id against which the xero client's queries will run. + ```python + >>> from xero import Xero + >>> # Use the first xero organisation (tenant) permitted + >>> credentials.set_default_tenant() + >>> xero = Xero(credentials) + >>> xero.contacts.all() + >>> ... + ``` + If the scopes supplied in Step 2 did not require access to organisations + (e.g. when only requesting scopes for single sign) it will not be + possible to make requests with the Xero API and `set_default_tenant()` will + raise an exception. + + To pick from multiple possible Xero organisations the `tenant_id` may be set + explicitly: + ```python + >>> tenants = credentials.get_tenants() + >>> credentials.tenant_id = tenants[1]['tenantId'] + >>> xero = Xero(credentials) + ``` + `OAuth2Credentials.__init__()` accepts `tenant_id` as a keyword argument. + + 5) When using the API over an extended period, you will need to exchange tokens + when they expire. If a refresh token is available, it can be used to + generate a new token: + ```python + >>> if credentials.expired(): + >>> credentials.refresh() + >>> # Then store the new credentials or token somewhere for future use: + >>> cred_state = credentials.state + >>> # or + >>> new_token = credentials.token + + **Important**: ``credentials.state`` changes after a token swap. Be sure to + persist the new state. + + ``` + #### CLI OAuth2 App Example + This example shows authorisation, automatic token refreshing and API use in + a Django app which has read/write access to contacts and transactions. + +Each time this app starts it asks for authentication, but you +could consider using the user `keyring` to store tokens. + + ```python + from xero import Xero + from xero.auth import OAuth2PKCECredentials + from xero.constants import XeroScopes + + # Get client_id, client_secret from config file or settings then + credentials = OAuth2PKCECredentials( + client_id, port=8080, + scope=[XeroScopes.OFFLINE_ACCESS, XeroScopes.ACCOUNTING_CONTACTS, + XeroScopes.ACCOUNTING_TRANSACTIONS] + ) + credentials.logon() + credentials.set_default_tenant() + + for contacts in xero.contacts.all() + print contact["Name"] + ``` + ### Older authentication methods ### In the past, Xero had the concept of "Public", "Private", and "Partner" diff --git a/xero/auth.py b/xero/auth.py index 1811dfb..cc9707d 100644 --- a/xero/auth.py +++ b/xero/auth.py @@ -714,6 +714,15 @@ def _handle_error_response(response): class PKCEAuthReceiver(http.server.BaseHTTPRequestHandler): + """ This is an http request processsor for server running on localhost, + used by the PKCE auth system. + Xero will redirect the browser after auth, from which we + can collect the toke Xero provides. + + You can subclass this and override the `send_error_page` and + `send_access_ok` methods to customise the sucess and failure + pages displayed in the browser. + """ def __init__(self, credmanager , *args,**kwargs): self.credmanager = credmanager super().__init__(*args,**kwargs) @@ -732,18 +741,79 @@ def do_GET(self,*args): self.send_error_page("Unknown endpoint") def send_error_page(self,error): + """Display an Error page. + Override this for a custom page. + """ print("Error:",error) def send_access_ok(self,): + """Display a success page" + Override this to provide a custom page. + """ print("LOGIN SUCCESS") self.shutdown() def shutdown(self): + """Start shutdowning our server and return immediately""" # Launch a thread to close our socket cleanly. threading.Thread(target=self.__class__.close_server, args=(self.server,)).start() class OAuth2PKCECredentials(OAuth2Credentials): + """An object wrapping the PKCE credential flow for Xero access. + + Usage: + 1) Construct an `OAuth2Credentials` instance: + + >>> from xero.auth import OAuth2PKCECredentials + >>> credentials = OAuth2Credentials(client_id,None, port=8080, + scope=scope) + + A webserver will be setup to listen on the provded port + number which is used for the Auth callback. + + 2) Send the login request. + >>> credentials.logon() + + This will open a browser window which will naviage to a Xero + login page. The Use should grant your application access (or not), + and will be redirected to a locally running webserver to capture + the auth tokens. + + + 3) Use the credentials. It is usually necessary to set the tenant_id (Xero + organisation id) to specify the organisation against which the queries should + run: + >>> from xero import Xero + >>> credentials.set_default_tenant() + >>> xero = Xero(credentials) + >>> xero.contacts.all() + ... + + To use a different organisation, set credentials.tenant_id: + >>> tenants = credentials.get_tenants() + >>> credentials.tenant_id = tenants[1]['tenantId'] + + 4) If a refresh token is available, it can be used to generate a new token: + >>> if credentials.expired(): + >>> credentials.refresh() + + Note that in order for tokens to be refreshable, Xero API requires + `offline_access` to be included in the scope. + + :param port: the port the local webserver will listen ro + :param verifier: (optional) a string verifier token if not + provided the module will generate it's own. + :param request_handler: An HTTP request handler class. This will + be used to handler the callback request. If + you wish to customise your error page, this is + where you pass in you custom class. + :param callback_uri: Allow customisation of the callback uri. Only + useful if you've customised the request_handler + to match. + + :param scope: Inhereited from Oath2Credentials.. + """ def __init__(self,*args,**kwargs): self.port = kwargs.pop('port',8080) # Xero requires between 43 adn 128 bytes, it fails @@ -753,10 +823,12 @@ def __init__(self,*args,**kwargs): self.error = None if isinstance(self.verifier,str): self.verifier = self.verifier.encode('ascii') + kwargs.setdefault('callback_uri',f"http://localhost:{self.port}/callback") super().__init__(*args,**kwargs) def logon(self,): + """Launch PKCE auth process and wait for completion""" challenge = str(base64.urlsafe_b64encode(hashlib.sha256(self.verifier).digest())[:-1],'ascii') url_base = super().generate_url() webbrowser.open(url_base +f"&code_challenge={challenge}&code_challenge_method=S256") @@ -770,6 +842,8 @@ def wait_for_callback(self,): raise self.error def verify_url(self,params,reqhandler): + """Used to verify the parameters xero returns in the + redirect callback""" error = params.get('error',None) if error: self.handle_error(error,reqhandler) @@ -791,6 +865,8 @@ def verify_url(self,params,reqhandler): reqhandler.send_access_ok() def get_token(self,code): + # Does the third leg, to get the actual auth token from Xero, + # once the authentication has been 'approved' by the user resp = requests.post( url=XERO_OAUTH2_TOKEN_URL, data={ From c167e94ccb1345aaf5953fb0bc02ca916552d15d Mon Sep 17 00:00:00 2001 From: Roger Gammans Date: Tue, 8 Mar 2022 20:32:20 +0000 Subject: [PATCH 4/7] Fix flake8 style issues --- tests/auth.py | 60 ++++++++++++++----------------- xero/auth.py | 97 ++++++++++++++++++++++++++++----------------------- 2 files changed, 81 insertions(+), 76 deletions(-) diff --git a/tests/auth.py b/tests/auth.py index 1660441..2fa89d4 100644 --- a/tests/auth.py +++ b/tests/auth.py @@ -591,6 +591,7 @@ def send_access_ok(self, ): def shutdown(self,): self.is_shutdown = True + class PKCECredentialsTest(unittest.TestCase): # Mostly the same in principle as the Oauth2 ones, # but oine include one where behavior is difeereny @@ -638,18 +639,18 @@ def test_verification_success(self, r_request): params = { 'code': ['0123456789'], 'scope': credentials.scope, - 'state': [ credentials.auth_state ] + 'state': [credentials.auth_state] } now = datetime.now().timestamp() - with patch.object(requests,'post', return_value=Mock( + with patch.object(requests, 'post', return_value=Mock( status_code=200, request=Mock(headers={}, body=""), headers={}, - json = lambda :{ - "access_token":"1234567890", + json=lambda: { + "access_token": "1234567890", "expires_at": now + 1800, - "token_type":"Bearer", - "refresh_token":"0987654321"} + "token_type": "Bearer", + "refresh_token": "0987654321"} )) as r_request: rhandler = PKCERequestHandlerDummy() credentials.verify_url(params, rhandler) @@ -680,14 +681,13 @@ def test_verification_failure(self,): params = { 'code': ['0123456789'], 'scope': credentials.scope, - 'state': [ credentials.auth_state ] + 'state': [credentials.auth_state] } - now = datetime.now().timestamp() - with patch.object(requests,'post', return_value=Mock( + with patch.object(requests, 'post', return_value=Mock( status_code=400, request=Mock(headers={}, body=""), headers={}, - json = lambda :{"error":"invalid_grant"} + json=lambda: {"error": "invalid_grant"} )): rhandler = PKCERequestHandlerDummy() credentials.verify_url(params, rhandler) @@ -697,8 +697,8 @@ def test_verification_failure(self,): def test_logon_opens_a_webbrowser(self,): credentials = OAuth2PKCECredentials( "client_id", "client_secret", auth_state="test_state", - request_handler= PKCERequestHandlerDummy, - port = 8123 + request_handler=PKCERequestHandlerDummy, + port=8123 ) server = Mock() with patch('http.server.HTTPServer', return_value=server) as hs, \ @@ -710,9 +710,8 @@ def test_logon_opens_a_webbrowser(self,): portdata = hs.call_args[0][0] partial = hs.call_args[0][1] self.assertEqual(portdata, ('', 8123)) - self.assertEqual(partial.func, PKCERequestHandlerDummy ) - self.assertEqual(partial.args, (credentials, ) ) - + self.assertEqual(partial.func, PKCERequestHandlerDummy) + self.assertEqual(partial.args, (credentials,)) class PKCECallbackHandlerTests(unittest.TestCase): @@ -722,32 +721,27 @@ def setUp(self,): # For tests we need to bypass this, so we can probe # the do_GET behaviour - hence this subclass. class TestRx(PKCEAuthReceiver): - def handle(self,*args): + def handle(self, *args): pass - import http.server self.pkce_manager = Mock() self.handler = TestRx(self.pkce_manager, - request=Mock(), - client_address=None, - server=Mock()) - + request=Mock(), + client_address=None, + server=Mock()) def test_going_somewhere_other_than_callback_errors(self,): - self.handler.path="/foo/bar/baz.html" - self.handler.send_error_page=Mock() + self.handler.path = "/foo/bar/baz.html" + self.handler.send_error_page = Mock() self.handler.do_GET() self.handler.send_error_page.assert_called_with("Unknown endpoint") - def test_going_somewhere_other_than_callback_errors(self,): - self.handler.path="/callback?value=123&something=foo&different=bar" - self.handler.send_error_page=Mock() + def test_revieving_callback_decodes_parms_and_sends_to_verifyurl(self,): + self.handler.path = "/callback?value=123&something=foo&different=bar" + self.handler.send_error_page = Mock() self.handler.do_GET() self.pkce_manager.verify_url.assert_called_with({ - 'value': ['123'], - 'something': ['foo'], - 'different': ['bar'], - }, - self.handler - ) - + 'value': ['123'], + 'something': ['foo'], + 'different': ['bar'], + }, self.handler) diff --git a/xero/auth.py b/xero/auth.py index cc9707d..4948e9f 100644 --- a/xero/auth.py +++ b/xero/auth.py @@ -712,7 +712,6 @@ def _handle_error_response(response): raise XeroExceptionUnknown(response) - class PKCEAuthReceiver(http.server.BaseHTTPRequestHandler): """ This is an http request processsor for server running on localhost, used by the PKCE auth system. @@ -723,30 +722,30 @@ class PKCEAuthReceiver(http.server.BaseHTTPRequestHandler): `send_access_ok` methods to customise the sucess and failure pages displayed in the browser. """ - def __init__(self, credmanager , *args,**kwargs): + def __init__(self, credmanager, *args, **kwargs): self.credmanager = credmanager - super().__init__(*args,**kwargs) + super().__init__(*args, **kwargs) @staticmethod def close_server(s): s.shutdown() - def do_GET(self,*args): + def do_GET(self, *args): request = urlparse(self.path) params = parse_qs(request.query) if request.path == "/callback": - self.credmanager.verify_url(params,self) + self.credmanager.verify_url(params, self) else: self.send_error_page("Unknown endpoint") - def send_error_page(self,error): + def send_error_page(self, error): """Display an Error page. Override this for a custom page. """ - print("Error:",error) + print("Error:", error) - def send_access_ok(self,): + def send_access_ok(self): """Display a success page" Override this to provide a custom page. """ @@ -769,7 +768,7 @@ class OAuth2PKCECredentials(OAuth2Credentials): >>> credentials = OAuth2Credentials(client_id,None, port=8080, scope=scope) - A webserver will be setup to listen on the provded port + A webserver will be setup to listen on the provded port number which is used for the Auth callback. 2) Send the login request. @@ -802,58 +801,70 @@ class OAuth2PKCECredentials(OAuth2Credentials): `offline_access` to be included in the scope. :param port: the port the local webserver will listen ro - :param verifier: (optional) a string verifier token if not - provided the module will generate it's own. + :param verifier: (optional) a string verifier token if not + provided the module will generate it's own :param request_handler: An HTTP request handler class. This will be used to handler the callback request. If - you wish to customise your error page, this is + you wish to customise your error page, this is where you pass in you custom class. :param callback_uri: Allow customisation of the callback uri. Only useful if you've customised the request_handler to match. - :param scope: Inhereited from Oath2Credentials.. + :param scope: Inhereited from Oath2Credentials. """ - def __init__(self,*args,**kwargs): - self.port = kwargs.pop('port',8080) + def __init__(self, *args, **kwargs): + self.port = kwargs.pop('port', 8080) # Xero requires between 43 adn 128 bytes, it fails # with invlaid grant if this is not long enough - self.verifier = kwargs.pop('verifier',secrets.token_urlsafe(64)) - self.handler_kls = kwargs.pop('request_handler', PKCEAuthReceiver ) + self.verifier = kwargs.pop('verifier', secrets.token_urlsafe(64)) + self.handler_kls = kwargs.pop('request_handler', PKCEAuthReceiver) self.error = None - if isinstance(self.verifier,str): + if isinstance(self.verifier, str): self.verifier = self.verifier.encode('ascii') - kwargs.setdefault('callback_uri',f"http://localhost:{self.port}/callback") - super().__init__(*args,**kwargs) + kwargs.setdefault('callback_uri', f"http://localhost:{self.port}/callback") + super().__init__(*args, **kwargs) - def logon(self,): + def logon(self): """Launch PKCE auth process and wait for completion""" - challenge = str(base64.urlsafe_b64encode(hashlib.sha256(self.verifier).digest())[:-1],'ascii') + challenge = str( + base64.urlsafe_b64encode( + hashlib.sha256( + self.verifier + ).digest() + )[:-1], 'ascii' + ) url_base = super().generate_url() - webbrowser.open(url_base +f"&code_challenge={challenge}&code_challenge_method=S256") + webbrowser.open( + f"{url_base}&code_challenge={challenge}&" + "code_challenge_method=S256" + ) self.wait_for_callback() - def wait_for_callback(self,): - listen_to = ('',self.port) - s = http.server.HTTPServer(listen_to, partial(self.handler_kls, self) ) + def wait_for_callback(self): + listen_to = ('', self.port) + s = http.server.HTTPServer( + listen_to, + partial(self.handler_kls, self) + ) s.serve_forever() if self.error: raise self.error - def verify_url(self,params,reqhandler): + def verify_url(self, params, reqhandler): """Used to verify the parameters xero returns in the redirect callback""" - error = params.get('error',None) + error = params.get('error', None) if error: - self.handle_error(error,reqhandler) + self.handle_error(error, reqhandler) return if params['state'][0] != self.state['auth_state']: - self.handle_error("State Mismatch",reqhandler) + self.handle_error("State Mismatch", reqhandler) return - code = params.get('code',None) + code = params.get('code', None) if code: try: self.get_token(code[0]) @@ -864,27 +875,27 @@ def verify_url(self,params,reqhandler): reqhandler.send_access_ok() - def get_token(self,code): + def get_token(self, code): # Does the third leg, to get the actual auth token from Xero, # once the authentication has been 'approved' by the user resp = requests.post( - url=XERO_OAUTH2_TOKEN_URL, - data={ - 'grant_type': 'authorization_code', - 'client_id': self.client_id, - 'redirect_uri': self.callback_uri, - 'code': code, - 'code_verifier': self.verifier - } - ) + url=XERO_OAUTH2_TOKEN_URL, + data={ + 'grant_type': 'authorization_code', + 'client_id': self.client_id, + 'redirect_uri': self.callback_uri, + 'code': code, + 'code_verifier': self.verifier + } + ) respdata = resp.json() - error = respdata.get('error',None) + error = respdata.get('error', None) if error: raise XeroAccessDenied(error) self._init_oauth(respdata) - def handle_error(self,msg,handler): + def handle_error(self, msg, handler): self.error = RuntimeError(msg) handler.send_error_page(msg) handler.shutdown() From 9ee7bbe732435043cdfadf3028264521d6a6ab9c Mon Sep 17 00:00:00 2001 From: Roger Gammans Date: Wed, 9 Mar 2022 19:08:06 +0000 Subject: [PATCH 5/7] Sort the imports with isort --- tests/auth.py | 7 +++---- xero/auth.py | 10 ++++------ 2 files changed, 7 insertions(+), 10 deletions(-) diff --git a/tests/auth.py b/tests/auth.py index 2fa89d4..038461d 100644 --- a/tests/auth.py +++ b/tests/auth.py @@ -1,20 +1,19 @@ import json +import requests import time import unittest from datetime import datetime, timedelta from mock import Mock, patch from six.moves.urllib.parse import parse_qs, urlparse -import requests - from xero.api import Xero from xero.auth import ( OAuth2Credentials, + OAuth2PKCECredentials, PartnerCredentials, + PKCEAuthReceiver, PrivateCredentials, PublicCredentials, - OAuth2PKCECredentials, - PKCEAuthReceiver ) from xero.constants import XERO_OAUTH2_AUTHORIZE_URL from xero.exceptions import ( diff --git a/xero/auth.py b/xero/auth.py index 4948e9f..5b4b473 100644 --- a/xero/auth.py +++ b/xero/auth.py @@ -1,21 +1,19 @@ from __future__ import unicode_literals +import base64 import datetime -import requests +import hashlib import http.server +import requests +import secrets import threading import webbrowser -import secrets -import hashlib -import base64 from functools import partial - from six.moves.urllib.parse import parse_qs, urlencode, urlparse from oauthlib.oauth1 import SIGNATURE_HMAC, SIGNATURE_RSA, SIGNATURE_TYPE_AUTH_HEADER from requests_oauthlib import OAuth1, OAuth2, OAuth2Session - from .constants import ( ACCESS_TOKEN_URL, AUTHORIZE_URL, From 98571ef48143933ac03735ea704d0ee11344cd00 Mon Sep 17 00:00:00 2001 From: Roger Gammans Date: Sat, 12 Mar 2022 19:04:41 +0000 Subject: [PATCH 6/7] Fix other typos --- tests/auth.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/auth.py b/tests/auth.py index 038461d..c6c56e4 100644 --- a/tests/auth.py +++ b/tests/auth.py @@ -593,7 +593,7 @@ def shutdown(self,): class PKCECredentialsTest(unittest.TestCase): # Mostly the same in principle as the Oauth2 ones, - # but oine include one where behavior is difeereny + # but just include tests where behavior is different. callback_uri = "http://localhost:8123/callback" def setUp(self): From 5ef2cae37575488924a11909ea9429254e1fbde0 Mon Sep 17 00:00:00 2001 From: Russell Keith-Magee Date: Sat, 11 Feb 2023 15:39:26 +0800 Subject: [PATCH 7/7] Light review and cleanup of PKCE OAuth2 flow docs. --- README.md | 131 +++++++++++++++++++++++++++--------------------------- 1 file changed, 66 insertions(+), 65 deletions(-) diff --git a/README.md b/README.md index a2027df..d2cabf1 100644 --- a/README.md +++ b/README.md @@ -17,7 +17,7 @@ pip install pyxero ### Using OAuth2 Credentials OAuth2 is an open standard authorization protocol that allows users to -provide specific permissions to apps that want to use their account. OAuth2 +provide specific permissions to apps that want to use their account. OAuth2 authentication is performed using *tokens* that are obtained using an API; these tokens are then provided with each subsequent request. @@ -188,40 +188,38 @@ def some_view_which_calls_xero(request): ### Using PKCE Credentials -PKCE is an alternative flow for providing authentication via OAuth2, -it works largely the same as the standard OAuth2 mechanism, but unlike -the normal flow is designed to work with applications which cannot -keep private keys secure, such as desktop, mobile or single page apps where -such secrets could be extracted from the binary. (Or even the source, if it -is available). A client ID however is still required. +PKCE is an alternative flow for providing authentication via OAuth2. It works +largely the same as the standard OAuth2 mechanism, but unlike the normal flow is +designed to work with applications which cannot keep private keys secure, such +as desktop, mobile or single page apps where such secrets could be extracted. A +client ID is still required. +As elsewhere, OAuth2 tokens have a 30 minute expiry, but can be only swapped for +a new token if the `offline_access` scope is requested. -As elsewher OAuth2 tokens have a 30 minute expiry, But can be only swapped for a new -token, if a the `offline_access` scope is requested. - - Xero documentation on the PKCE flow can be found at - -[here](https://developer.xero.com/documentation/guides/oauth2/pkce-flow) -. -The procedure -for creating and authenticating credentials is as follows *(with a CLI +Xero documentation on the PKCE flow can be found +[here](https://developer.xero.com/documentation/guides/oauth2/pkce-flow). The +procedure for creating and authenticating credentials is as follows *(with a CLI example at the end)*: 1) [Register your app](https://developer.xero.com/myapps) with Xero, using a redirect URI which will be served by your app in order to complete the - authorisation e.g. `http://localhost:/callback/`. - You can chose any port, anc can pass it to the credentials object on construction, - allow with the the Client Id you are provded with. + authorisation e.g. `http://localhost:/callback/`. You can chose any + port, anc can pass it to the credentials object on construction, allow with + the the Client Id you are provded with. 2) Construct an `OAuth2Credentials` instance using the details from the first step. + ```python >>> from xero.auth import OAuth2Credentials >>> >>> credentials = OAuth2PKCECredentials(client_id, port=my_port) ``` - If neccessary pass in a list of scopes to define the scopes required by your - app. E.g. if write access is required to transactions and payroll employees: + + If neccessary, pass in a list of scopes to define the scopes required by + your app. E.g. if write access is required to transactions and payroll + employees: ```python >>> from xero.constants import XeroScopes @@ -231,10 +229,12 @@ example at the end)*: >>> credentials = OAuth2Credentials(client_id, scope=my_scope >>> port=my_port) ``` + The default scopes are `['offline_access', 'accounting.transactions.read', 'accounting.contacts.read']`. `offline_access` is required in order for - tokens to be refreshable. For more details on scopes see Xero's - [documentation]( https://developer.xero.com/documentation/oauth2/scopes). + tokens to be refreshable. For more details on scopes see [Xero's + documentation on oAuth2 + scopes](https://developer.xero.com/documentation/oauth2/scopes). 3) Call `credentials.logon()` . This will open a browser window, an visit a Xero authentication page. @@ -243,25 +243,23 @@ example at the end)*: >>> credentials.logon() ``` - As well as opening that browser window the Authenticator will start - up a local webserver on the provided port, to collect the tokens - Xero returns. + The Authenticator will also start a local webserver on the provided port. + This webserver will be used to collect the tokens that Xero returns. - The default PCKEAuthReceiver class has no reponse - pages defined so the browser will show an error, on empty page - for all transactions. But the application is now authorised and - will continue. - If you wish you can override the `send_access_ok()` method, and the - `send_error_page()` method to create a more userfriendly experience. + The default `PCKEAuthReceiver` class has no reponse pages defined so the + browser will show an error, on empty page for all transactions. But the + application is now authorised and will continue. If you wish you can + override the `send_access_ok()` method, and the `send_error_page()` method + to create a more userfriendly experience. - In either case once the callback url has been visited the local - server will shutdown. - + In either case once the callback url has been visited the local server will + shutdown. + + 4) You can now continue as per the normal OAuth2 flow. Now the credentials may + be used to authorize a Xero session. As OAuth2 allows authentication for + multiple Xero Organisations, it is necessary to set the tenant_id against + which the xero client's queries will run. - 4) You can now continue as per the normal OAuth2 flow. - Now the credentials may be used to authorize a Xero session. As OAuth2 - allows authentication for multiple Xero Organisations, it is necessary to - set the tenant_id against which the xero client's queries will run. ```python >>> from xero import Xero >>> # Use the first xero organisation (tenant) permitted @@ -271,12 +269,13 @@ example at the end)*: >>> ... ``` If the scopes supplied in Step 2 did not require access to organisations - (e.g. when only requesting scopes for single sign) it will not be - possible to make requests with the Xero API and `set_default_tenant()` will - raise an exception. + (e.g. when only requesting scopes for single sign) it will not be possible + to make requests with the Xero API and `set_default_tenant()` will raise an + exception. To pick from multiple possible Xero organisations the `tenant_id` may be set explicitly: + ```python >>> tenants = credentials.get_tenants() >>> credentials.tenant_id = tenants[1]['tenantId'] @@ -287,6 +286,7 @@ example at the end)*: 5) When using the API over an extended period, you will need to exchange tokens when they expire. If a refresh token is available, it can be used to generate a new token: + ```python >>> if credentials.expired(): >>> credentials.refresh() @@ -299,37 +299,39 @@ example at the end)*: persist the new state. ``` - #### CLI OAuth2 App Example - This example shows authorisation, automatic token refreshing and API use in - a Django app which has read/write access to contacts and transactions. + +#### CLI OAuth2 App Example + +This example shows authorisation, automatic token refreshing and API use in +a Django app which has read/write access to contacts and transactions. Each time this app starts it asks for authentication, but you could consider using the user `keyring` to store tokens. - ```python - from xero import Xero - from xero.auth import OAuth2PKCECredentials - from xero.constants import XeroScopes - - # Get client_id, client_secret from config file or settings then - credentials = OAuth2PKCECredentials( - client_id, port=8080, - scope=[XeroScopes.OFFLINE_ACCESS, XeroScopes.ACCOUNTING_CONTACTS, - XeroScopes.ACCOUNTING_TRANSACTIONS] - ) - credentials.logon() - credentials.set_default_tenant() +```python +from xero import Xero +from xero.auth import OAuth2PKCECredentials +from xero.constants import XeroScopes - for contacts in xero.contacts.all() - print contact["Name"] - ``` +# Get client_id, client_secret from config file or settings then +credentials = OAuth2PKCECredentials( + client_id, port=8080, + scope=[XeroScopes.OFFLINE_ACCESS, XeroScopes.ACCOUNTING_CONTACTS, + XeroScopes.ACCOUNTING_TRANSACTIONS] +) +credentials.logon() +credentials.set_default_tenant() + +for contacts in xero.contacts.all() + print contact["Name"] +``` ### Older authentication methods ### -In the past, Xero had the concept of "Public", "Private", and "Partner" +In the past, Xero had the concept of "Public", "Private", and "Partner" applications, which each had their own authentication procedures. However, -they removed access for Public applications on 31 March 2021; Private -applications were removed on 30 September 2021. Partner applications +they removed access for Public applications on 31 March 2021; Private +applications were removed on 30 September 2021. Partner applications still exist, but the only supported authentication method is OAuth2; these are now referred to as "OAuth2 apps". As Xero no longer supports these older authentication methods, neither does PyXero. @@ -584,4 +586,3 @@ New features or bug fixes can be submitted via a pull request. If you want your pull request to be merged quickly, make sure you either include regression test(s) for the behavior you are adding/fixing, or provide a good explanation of why a regression test isn't possible. -