diff options
| author | Tomasz Kramkowski <tomasz@kramkow.ski> | 2023-03-24 19:32:48 +0000 | 
|---|---|---|
| committer | Tomasz Kramkowski <tomasz@kramkow.ski> | 2023-03-24 20:25:01 +0000 | 
| commit | 409721a23691a2cd865e2de1912cb0fa47e26680 (patch) | |
| tree | 6bf0bebfca6de2c6ea74faed7f19af7e3c9e4227 | |
| parent | 3a1374195b87356b7a50e6daa9782b6f62428c34 (diff) | |
| download | paste-409721a23691a2cd865e2de1912cb0fa47e26680.tar.gz paste-409721a23691a2cd865e2de1912cb0fa47e26680.tar.xz paste-409721a23691a2cd865e2de1912cb0fa47e26680.zip | |
authenticate: rewrite
Fixes a few core issues:
- No longer uses the Bearer type, instead using a custom APIKey type,
  this is because this has nothing to do with OAuth 2.0 bearer tokens.
- Validation is moved into the middleware, with the check_auth function
  renamed to check_token with the sole job of checking the token, this
  makes testing easier
- Validation handles more cases
| -rw-r--r-- | paste/__init__.py | 25 | 
1 files changed, 15 insertions, 10 deletions
| diff --git a/paste/__init__.py b/paste/__init__.py index f8985eb..482c534 100644 --- a/paste/__init__.py +++ b/paste/__init__.py @@ -173,13 +173,7 @@ def open_database(app: App, environ: Env, start_response: StartResponse) -> Resp          return app(environ, start_response) -def check_auth(conn: Connection, auth: Optional[str]) -> bool: -    if not auth or not auth.startswith("Bearer "): -        return False -    try: -        token = b64decode(auth.removeprefix("Bearer ").encode()) -    except binascii.Error: -        return False +def check_token(conn: Connection, token: bytes) -> bool:      (count,) = conn.execute(          "SELECT COUNT(*) FROM token WHERE hash = sha256(?)", (token,)      ).fetchone() @@ -188,9 +182,20 @@ def check_auth(conn: Connection, auth: Optional[str]) -> bool:  @middleware  def authenticate(app: App, environ: Env, start_response: StartResponse) -> Response: -    conn = environ["paste.db_conn"] -    token = environ.get("HTTP_AUTHORIZATION") -    if environ["REQUEST_METHOD"] in {"GET", "HEAD"} or check_auth(conn, token): +    def check_auth(): +        value = environ.get("HTTP_AUTHORIZATION") +        if not isinstance(value, str): +            return False +        if not value.startswith("APIKey "): +            return False +        value = value.removeprefix("APIKey ") +        try: +            value = b64decode(value.encode(), validate=True) +        except (binascii.Error, UnicodeEncodeError): +            return False +        return check_token(environ["paste.db_conn"], value) + +    if environ["REQUEST_METHOD"] in {"GET", "HEAD"} or check_auth():          return app(environ, start_response)      return simple_response(start_response, "401 Unauthorized") | 
