aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorTomasz Kramkowski <tomasz@kramkow.ski>2023-03-24 19:32:48 +0000
committerTomasz Kramkowski <tomasz@kramkow.ski>2023-03-24 20:25:01 +0000
commit409721a23691a2cd865e2de1912cb0fa47e26680 (patch)
tree6bf0bebfca6de2c6ea74faed7f19af7e3c9e4227
parent3a1374195b87356b7a50e6daa9782b6f62428c34 (diff)
downloadpaste-409721a23691a2cd865e2de1912cb0fa47e26680.tar.gz
paste-409721a23691a2cd865e2de1912cb0fa47e26680.tar.xz
paste-409721a23691a2cd865e2de1912cb0fa47e26680.zip
authenticate: rewrite
Fixes a few core issues: - No longer uses the Bearer type, instead using a custom APIKey type, this is because this has nothing to do with OAuth 2.0 bearer tokens. - Validation is moved into the middleware, with the check_auth function renamed to check_token with the sole job of checking the token, this makes testing easier - Validation handles more cases
-rw-r--r--paste/__init__.py25
1 files changed, 15 insertions, 10 deletions
diff --git a/paste/__init__.py b/paste/__init__.py
index f8985eb..482c534 100644
--- a/paste/__init__.py
+++ b/paste/__init__.py
@@ -173,13 +173,7 @@ def open_database(app: App, environ: Env, start_response: StartResponse) -> Resp
return app(environ, start_response)
-def check_auth(conn: Connection, auth: Optional[str]) -> bool:
- if not auth or not auth.startswith("Bearer "):
- return False
- try:
- token = b64decode(auth.removeprefix("Bearer ").encode())
- except binascii.Error:
- return False
+def check_token(conn: Connection, token: bytes) -> bool:
(count,) = conn.execute(
"SELECT COUNT(*) FROM token WHERE hash = sha256(?)", (token,)
).fetchone()
@@ -188,9 +182,20 @@ def check_auth(conn: Connection, auth: Optional[str]) -> bool:
@middleware
def authenticate(app: App, environ: Env, start_response: StartResponse) -> Response:
- conn = environ["paste.db_conn"]
- token = environ.get("HTTP_AUTHORIZATION")
- if environ["REQUEST_METHOD"] in {"GET", "HEAD"} or check_auth(conn, token):
+ def check_auth():
+ value = environ.get("HTTP_AUTHORIZATION")
+ if not isinstance(value, str):
+ return False
+ if not value.startswith("APIKey "):
+ return False
+ value = value.removeprefix("APIKey ")
+ try:
+ value = b64decode(value.encode(), validate=True)
+ except (binascii.Error, UnicodeEncodeError):
+ return False
+ return check_token(environ["paste.db_conn"], value)
+
+ if environ["REQUEST_METHOD"] in {"GET", "HEAD"} or check_auth():
return app(environ, start_response)
return simple_response(start_response, "401 Unauthorized")