diff options
author | Tomasz Kramkowski <tomasz@kramkow.ski> | 2023-03-24 19:32:48 +0000 |
---|---|---|
committer | Tomasz Kramkowski <tomasz@kramkow.ski> | 2023-03-24 20:25:01 +0000 |
commit | 409721a23691a2cd865e2de1912cb0fa47e26680 (patch) | |
tree | 6bf0bebfca6de2c6ea74faed7f19af7e3c9e4227 | |
parent | 3a1374195b87356b7a50e6daa9782b6f62428c34 (diff) | |
download | paste-409721a23691a2cd865e2de1912cb0fa47e26680.tar.gz paste-409721a23691a2cd865e2de1912cb0fa47e26680.tar.xz paste-409721a23691a2cd865e2de1912cb0fa47e26680.zip |
authenticate: rewrite
Fixes a few core issues:
- No longer uses the Bearer type, instead using a custom APIKey type,
this is because this has nothing to do with OAuth 2.0 bearer tokens.
- Validation is moved into the middleware, with the check_auth function
renamed to check_token with the sole job of checking the token, this
makes testing easier
- Validation handles more cases
-rw-r--r-- | paste/__init__.py | 25 |
1 files changed, 15 insertions, 10 deletions
diff --git a/paste/__init__.py b/paste/__init__.py index f8985eb..482c534 100644 --- a/paste/__init__.py +++ b/paste/__init__.py @@ -173,13 +173,7 @@ def open_database(app: App, environ: Env, start_response: StartResponse) -> Resp return app(environ, start_response) -def check_auth(conn: Connection, auth: Optional[str]) -> bool: - if not auth or not auth.startswith("Bearer "): - return False - try: - token = b64decode(auth.removeprefix("Bearer ").encode()) - except binascii.Error: - return False +def check_token(conn: Connection, token: bytes) -> bool: (count,) = conn.execute( "SELECT COUNT(*) FROM token WHERE hash = sha256(?)", (token,) ).fetchone() @@ -188,9 +182,20 @@ def check_auth(conn: Connection, auth: Optional[str]) -> bool: @middleware def authenticate(app: App, environ: Env, start_response: StartResponse) -> Response: - conn = environ["paste.db_conn"] - token = environ.get("HTTP_AUTHORIZATION") - if environ["REQUEST_METHOD"] in {"GET", "HEAD"} or check_auth(conn, token): + def check_auth(): + value = environ.get("HTTP_AUTHORIZATION") + if not isinstance(value, str): + return False + if not value.startswith("APIKey "): + return False + value = value.removeprefix("APIKey ") + try: + value = b64decode(value.encode(), validate=True) + except (binascii.Error, UnicodeEncodeError): + return False + return check_token(environ["paste.db_conn"], value) + + if environ["REQUEST_METHOD"] in {"GET", "HEAD"} or check_auth(): return app(environ, start_response) return simple_response(start_response, "401 Unauthorized") |