Skip to content

Commit

Permalink
Issue #679 ldap3 library implemented instead of python_ldap
Browse files Browse the repository at this point in the history
  • Loading branch information
ivan committed Oct 21, 2019
1 parent 23a4525 commit 90c6dc3
Show file tree
Hide file tree
Showing 7 changed files with 303 additions and 96 deletions.
2 changes: 1 addition & 1 deletion components/server/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,4 @@ bottle-mongo==0.3.0
gevent==1.4.0
lxml==4.4.1
pymongo==3.9.0
python-ldap==3.2.0
ldap3==2.6.1
6 changes: 4 additions & 2 deletions components/server/setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,10 @@ ignore_missing_imports = true
[mypy-gevent]
ignore_missing_imports = true

[mypy-ldap]
[mypy-ldap3]
ignore_missing_imports = true

[mypy-ldap3.core]
ignore_missing_imports = true

[mypy-lxml.html.clean]
Expand All @@ -24,4 +27,3 @@ ignore_missing_imports = true

[mypy-pymongo.database]
ignore_missing_imports = true

59 changes: 39 additions & 20 deletions components/server/src/routes/auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,16 @@
import re
from typing import cast, Dict, Tuple
import urllib.parse
import hashlib
import base64

from pymongo.database import Database
import ldap
import bottle
from ldap3 import Server, Connection, ALL
from ldap3.core import exceptions

import bottle
from database import sessions
from utilities.functions import uuid
from utilities.ldap import LDAPObject
from utilities.type import SessionId


Expand All @@ -31,36 +33,53 @@ def set_session_cookie(session_id: str, expires_datetime: datetime) -> None:
options["domain"] = domain
bottle.response.set_cookie("session_id", session_id, **options)

def check_password(ssha_ldap_salted_password, password):
"""Checks the OpenLDAP tagged digest against the given password"""

if ssha_ldap_salted_password[:6] != b'{SSHA}':
logging.warning("Only SSHA LDAP password digest supported!")
raise exceptions.LDAPInvalidAttributeSyntaxResult

digest_salt_b64 = ssha_ldap_salted_password[6:] # strip {SSHA}

digest_salt = base64.b64decode(digest_salt_b64)
digest = digest_salt[:20]
salt = digest_salt[20:]

sha = hashlib.sha1(bytes(password, 'utf-8')) #nosec
sha.update(salt) #nosec

return digest == sha.digest()

@bottle.post("/login")
def login(database: Database) -> Dict[str, bool]:
"""Log the user in."""
# Pylint can't find the ldap.* constants for some reason, turn off the error message:
# pylint: disable=no-member
credentials = dict(bottle.request.json)
unsafe_characters = re.compile(r"[^\w ]+", re.UNICODE)
username = re.sub(unsafe_characters, "", credentials.get("username", "no username given"))
ldap_root_dn = os.environ.get("LDAP_ROOT_DN", "dc=example,dc=org")
ldap_url = os.environ.get("LDAP_URL", "ldap://localhost:389")
ldap_lookup_user = os.environ.get("LDAP_LOOKUP_USER", "admin")
ldap_lookup_user_password = os.environ.get("LDAP_LOOKUP_USER_PASSWORD", "admin")
ldap_server = ldap.initialize(ldap_url)

try:
ldap_server.simple_bind_s(f"cn={ldap_lookup_user},{ldap_root_dn}", ldap_lookup_user_password)
result = ldap_server.search_s(
ldap_root_dn, ldap.SCOPE_SUBTREE, f"(|(uid={username})(cn={username}))", ['dn', 'uid', 'cn'])
if result:
logging.info("LDAP search result: %s", result)
username = LDAPObject(result[0][1]).cn
else:
raise ldap.INVALID_CREDENTIALS
ldap_server.simple_bind_s(f"cn={username},{ldap_root_dn}", credentials.get("password"))
except (ldap.INVALID_CREDENTIALS, ldap.UNWILLING_TO_PERFORM, ldap.INVALID_DN_SYNTAX,
ldap.SERVER_DOWN) as reason:
logging.warning("Couldn't bind cn=%s,%s: %s", username, ldap_root_dn, reason)
ldap_server = Server(ldap_url, get_info=ALL)
with Connection(ldap_server,
user=f"cn={ldap_lookup_user},{ldap_root_dn}", password=ldap_lookup_user_password) as conn:
if not conn.bind():
username = ldap_lookup_user
raise exceptions.LDAPBindError

conn.search(ldap_root_dn, f"(|(uid={username})(cn={username}))", attributes=['userPassword'])
result = conn.entries[0]
password = credentials.get("password", "no password given")
if not check_password(result.userPassword.value, password):
return dict(ok=False)

except Exception as reason: # pylint: disable=broad-except
logging.warning("LDAP error for cn=%s,%s: %s", username, ldap_root_dn, reason)
return dict(ok=False)
finally:
ldap_server.unbind_s()

session_id, session_expiration_datetime = generate_session()
sessions.upsert(database, username, session_id, session_expiration_datetime)
set_session_cookie(session_id, session_expiration_datetime)
Expand Down
14 changes: 0 additions & 14 deletions components/server/src/utilities/ldap.py

This file was deleted.

68 changes: 68 additions & 0 deletions components/server/tests/unittests/database/test_sessions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
"""Test the sessions."""

import unittest
from datetime import datetime, timedelta
from unittest.mock import patch, MagicMock
from src.database import sessions

class SessionsTest(unittest.TestCase):
"""Unit tests for sessions class."""

def test_upsert(self):
"""Test upsert function."""
database = MagicMock()
database.sessions = MagicMock()
database.sessions.update = MagicMock()
self.assertIsNone(
sessions.upsert(database=database, username='un', session_id='5',
session_expiration_datetime=datetime(2019, 10, 18, 19, 22, 5, 99)))
database.sessions.update.assert_called_with(
{'user': 'un'}, {'user': 'un', 'session_id': '5',
'session_expiration_datetime': datetime(2019, 10, 18, 19, 22, 5, 99)}, upsert=True)

def test_delete(self):
"""Test delete function."""
database = MagicMock()
database.sessions = MagicMock()
database.sessions.delete_one = MagicMock()
self.assertIsNone(sessions.delete(database=database, session_id='5'))
database.sessions.delete_one.assert_called_with({'session_id': '5'})

def test_valid(self):
"""Test valid function."""
session_obj = MagicMock()
session_obj.get = MagicMock(return_value=(datetime.now() + timedelta(seconds=5)))
database = MagicMock()
database.sessions = MagicMock()
database.sessions.find_one = MagicMock(return_value=session_obj)
self.assertTrue(sessions.valid(database=database, session_id='5'))
database.sessions.find_one.assert_called_with({'session_id': '5'})

def test_valid_min_date(self):
"""Test valid function with min date."""
session_obj = MagicMock()
session_obj.get = MagicMock(return_value=datetime.min)
database = MagicMock()
database.sessions = MagicMock()
database.sessions.find_one = MagicMock(return_value=session_obj)
self.assertFalse(sessions.valid(database=database, session_id='5'))
database.sessions.find_one.assert_called_with({'session_id': '5'})

def test_valid_session_not_found(self):
"""Test valid function when the session is not found."""
database = MagicMock()
database.sessions = MagicMock()
database.sessions.find_one = MagicMock(return_value=None)
self.assertFalse(sessions.valid(database=database, session_id='5'))
database.sessions.find_one.assert_called_with({'session_id': '5'})

@patch('bottle.request')
def test_user(self, bottle_mock):
"""Test user function."""
bottle_mock.get_cookie = MagicMock(return_value=5)
database = MagicMock()
database.sessions = MagicMock()
database.sessions.find_one = MagicMock(return_value={"user": "OK"})

self.assertEqual('OK', sessions.user(database=database))
database.sessions.find_one.assert_called_with({'session_id': 5})
Loading

0 comments on commit 90c6dc3

Please sign in to comment.