From 7bc36edb7b15cae90bbfa81187b63821466cdd59 Mon Sep 17 00:00:00 2001 From: Tomasz Kramkowski Date: Fri, 24 Mar 2023 19:45:17 +0000 Subject: add middleware tests --- tests/common_wsgi.py | 50 +++++++++++ tests/middleware/__init__.py | 0 tests/middleware/test_authenticate.py | 116 +++++++++++++++++++++++++ tests/middleware/test_catch_exceptions.py | 37 ++++++++ tests/middleware/test_if_none_match.py | 138 ++++++++++++++++++++++++++++++ tests/middleware/test_open_database.py | 55 ++++++++++++ tests/middleware/test_options.py | 43 ++++++++++ tests/middleware/test_validate_method.py | 44 ++++++++++ 8 files changed, 483 insertions(+) create mode 100644 tests/common_wsgi.py create mode 100644 tests/middleware/__init__.py create mode 100644 tests/middleware/test_authenticate.py create mode 100644 tests/middleware/test_catch_exceptions.py create mode 100644 tests/middleware/test_if_none_match.py create mode 100644 tests/middleware/test_open_database.py create mode 100644 tests/middleware/test_options.py create mode 100644 tests/middleware/test_validate_method.py diff --git a/tests/common_wsgi.py b/tests/common_wsgi.py new file mode 100644 index 0000000..21e8deb --- /dev/null +++ b/tests/common_wsgi.py @@ -0,0 +1,50 @@ +from dataclasses import dataclass +from types import TracebackType +from typing import Optional, Union +from wsgiref.util import setup_testing_defaults + + +@dataclass +class Response: + data: bytes + status: str + headers: list[tuple[str, str]] + exc_info: Union[ + None, + tuple[type[BaseException], BaseException, TracebackType], + tuple[None, None, None], + ] + + +def call_app(app, environ={}): + def write(_): + raise AssertionError("write was called, but should not have been") + + status = None + headers = None + exc_info = None + + def start_response(s, h, e_i=None): + nonlocal status, headers, exc_info + if isinstance(headers, list) and e_i is None: + raise AssertionError("Start response called twice") + status = s + headers = h + exc_info = e_i + return write + + setup_testing_defaults(environ) + if "QUERY_STRING" not in environ: + environ["QUERY_STRING"] = "" + resp_iter = app(environ, start_response) + assert isinstance(status, str) + assert isinstance(headers, list) + resp = b"".join(resp_iter) + if hasattr(resp_iter, "close"): + resp_iter.close() + return Response( + resp, + status, + headers, + exc_info, + ) diff --git a/tests/middleware/__init__.py b/tests/middleware/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/middleware/test_authenticate.py b/tests/middleware/test_authenticate.py new file mode 100644 index 0000000..28ccef2 --- /dev/null +++ b/tests/middleware/test_authenticate.py @@ -0,0 +1,116 @@ +from base64 import b64encode +from wsgiref.validate import validator + +import pytest + +import paste +from paste import authenticate + +from ..common_wsgi import call_app + + +@pytest.fixture +def app(): + @validator + @authenticate + @validator + def app(_, start_response): + start_response("200 OK", [("Content-Type", "text/plain")]) + return [b"Hello, world!"] + + return app + + +@pytest.mark.parametrize("method", ["GET", "HEAD"]) +def test_unauthenticated_request(app, method, monkeypatch): + monkeypatch.delattr(paste, "check_token") + environ = {"REQUEST_METHOD": method} + response = call_app(app, environ) + assert response.data == b"Hello, world!" + assert response.status == "200 OK" + assert ("Content-Type", "text/plain") in response.headers + + +@pytest.mark.parametrize("method", ["GET", "HEAD"]) +def test_unauthenticated_request_with_key(app, method, monkeypatch): + monkeypatch.delattr(paste, "check_token") + environ = { + "REQUEST_METHOD": method, + "paste.db_conn": None, + "HTTP_AUTHORIZATION": "ApiKey AAAA", + } + response = call_app(app, environ) + assert response.data == b"Hello, world!" + assert response.status == "200 OK" + assert ("Content-Type", "text/plain") in response.headers + + +@pytest.mark.parametrize("method", ["POST", "PUT", "DELETE"]) +def test_authenticate_no_header(app, method, monkeypatch): + monkeypatch.delattr(paste, "check_token") + environ = {"REQUEST_METHOD": method} + response = call_app(app, environ) + assert response.data == b"401 Unauthorized\n" + assert response.status == "401 Unauthorized" + assert ("Content-Type", "text/plain") in response.headers + + +@pytest.mark.parametrize("method", ["POST", "PUT", "DELETE"]) +@pytest.mark.parametrize("key", ["ApiKey AAAA", "APIKey AAA", "APIKey AAAA", "AAAA"]) +def test_authenticate_malformed_key(app, method, key, monkeypatch): + monkeypatch.delattr(paste, "check_token") + environ = {"REQUEST_METHOD": method, "HTTP_AUTHORIZATION": key} + response = call_app(app, environ) + assert response.data == b"401 Unauthorized\n" + assert response.status == "401 Unauthorized" + assert ("Content-Type", "text/plain") in response.headers + + +@pytest.mark.parametrize("method", ["POST", "PUT", "DELETE"]) +def test_authenticate_check_token_fail(app, method, monkeypatch): + check_token_called = False + token = b"test" + environ = { + "REQUEST_METHOD": method, + "paste.db_conn": object(), + "HTTP_AUTHORIZATION": f"APIKey {b64encode(token).decode()}", + } + + def check_token(conn, tok): + nonlocal check_token_called + assert conn == environ["paste.db_conn"] + assert tok == token + check_token_called = True + return False + + monkeypatch.setattr(paste, "check_token", check_token) + response = call_app(app, environ) + assert check_token_called + assert response.data == b"401 Unauthorized\n" + assert response.status == "401 Unauthorized" + assert ("Content-Type", "text/plain") in response.headers + + +@pytest.mark.parametrize("method", ["POST", "PUT", "DELETE"]) +def test_authenticate_check_token_success(app, method, monkeypatch): + check_token_called = False + token = b"test" + environ = { + "REQUEST_METHOD": method, + "paste.db_conn": object(), + "HTTP_AUTHORIZATION": f"APIKey {b64encode(token).decode()}", + } + + def check_token(conn, tok): + nonlocal check_token_called + assert conn == environ["paste.db_conn"] + assert tok == token + check_token_called = True + return True + + monkeypatch.setattr(paste, "check_token", check_token) + response = call_app(app, environ) + assert check_token_called + assert response.data == b"Hello, world!" + assert response.status == "200 OK" + assert ("Content-Type", "text/plain") in response.headers diff --git a/tests/middleware/test_catch_exceptions.py b/tests/middleware/test_catch_exceptions.py new file mode 100644 index 0000000..5042f7f --- /dev/null +++ b/tests/middleware/test_catch_exceptions.py @@ -0,0 +1,37 @@ +from wsgiref.validate import validator + +from paste import catch_exceptions + +from ..common_wsgi import call_app + + +def test_no_exception(): + @validator + @catch_exceptions + @validator + def app(_, start_response): + start_response("200 OK", [("Content-Type", "text/plain")]) + return [b"Hello, world!"] + + response = call_app(app) + assert response.data == b"Hello, world!" + assert response.status == "200 OK" + assert ("Content-Type", "text/plain") in response.headers + + +def test_exception(capfd): + @validator + @catch_exceptions + @validator + def app(_, start_response): + start_response("200 OK", [("Content-Type", "text/plain")]) + raise Exception("Something went wrong") + + response = call_app(app) + assert response.data == b"500 Internal Server Error\n" + assert response.status == "500 Internal Server Error" + assert ("Content-Type", "text/plain") in response.headers + out, _ = capfd.readouterr() + assert "Exception: Something went wrong\n" in out + assert response.exc_info is not None + assert isinstance(response.exc_info[1], Exception) diff --git a/tests/middleware/test_if_none_match.py b/tests/middleware/test_if_none_match.py new file mode 100644 index 0000000..8456c05 --- /dev/null +++ b/tests/middleware/test_if_none_match.py @@ -0,0 +1,138 @@ +from wsgiref.validate import validator + +import pytest + +from paste import if_none_match + +from ..common_wsgi import call_app + +ETAG = "1234" + + +@pytest.fixture +def app(): + @validator + @if_none_match + @validator + def app(environ, start_response): + body = b"Hello, world!" + etag_header = ("ETag", f'"{ETAG}"') + if environ["REQUEST_METHOD"] == "HEAD": + start_response( + "200 OK", + [ + ("Content-Type", "text/plain"), + ("Content-Length", str(len(body))), + etag_header, + ], + ) + return [] + start_response("200 OK", [("Content-Type", "text/plain"), etag_header]) + return [body] + + return app + + +@pytest.mark.parametrize("method", ["POST", "PUT", "DELETE"]) +def test_non_get_or_head_request_passthrough(app, method): + environ = {"REQUEST_METHOD": method, "HTTP_IF_NONE_MATCH": f'"{ETAG}"'} + response = call_app(app, environ) + assert response.data == b"Hello, world!" + assert response.status == "200 OK" + assert ("Content-Type", "text/plain") in response.headers + + +def test_if_none_match_header_not_set(app): + environ = {"REQUEST_METHOD": "GET"} + response = call_app(app, environ) + assert response.data == b"Hello, world!" + assert response.status == "200 OK" + assert ("Content-Type", "text/plain") in response.headers + + +@pytest.mark.parametrize("value", [f'"{ETAG}"', f'W/"{ETAG}"']) +def test_etag_matches_if_none_match_header(app, value): + environ = {"REQUEST_METHOD": "GET", "HTTP_IF_NONE_MATCH": value} + response = call_app(app, environ) + assert response.data == b"" + assert response.status == "304 Not Modified" + + +@pytest.mark.parametrize( + "value", + [ + f'"{ETAG}", "54321"', + f'"54321", "{ETAG}", "098765"', + f'"54321", "{ETAG}"', + f'W/"{ETAG}", "54321"', + f'"54321", W/"{ETAG}", "098765"', + f'"54321", W/"{ETAG}"', + ], +) +def test_etag_matches_if_none_match_header_list(app, value): + environ = {"REQUEST_METHOD": "GET", "HTTP_IF_NONE_MATCH": value} + response = call_app(app, environ) + assert response.data == b"" + assert response.status == "304 Not Modified" + + +def test_etag_matches_if_none_match_header_list2(app): + environ = {"REQUEST_METHOD": "GET", "HTTP_IF_NONE_MATCH": f'"54321", "{ETAG}"'} + response = call_app(app, environ) + assert response.data == b"" + assert response.status == "304 Not Modified" + + +def test_etag_matches_if_none_match_header_star(app): + environ = {"REQUEST_METHOD": "GET", "HTTP_IF_NONE_MATCH": "*"} + response = call_app(app, environ) + assert response.data == b"" + assert response.status == "304 Not Modified" + + +def test_missing_etag_does_not_match_if_none_match_header_star(): + @validator + @if_none_match + @validator + def app(environ, start_response): + body = b"Hello, world!" + if environ["REQUEST_METHOD"] == "HEAD": + start_response( + "200 OK", + [ + ("Content-Type", "text/plain"), + ("Content-Length", str(len(body))), + ], + ) + return [] + start_response("200 OK", [("Content-Type", "text/plain")]) + return [body] + + environ = {"REQUEST_METHOD": "GET", "HTTP_IF_NONE_MATCH": "*"} + response = call_app(app, environ) + assert response.data == b"Hello, world!" + assert response.status == "200 OK" + + +def test_etag_does_not_match_if_none_match_header(app): + environ = {"REQUEST_METHOD": "GET", "HTTP_IF_NONE_MATCH": '"54321"'} + response = call_app(app, environ) + assert response.data == b"Hello, world!" + assert response.status == "200 OK" + assert ("Content-Type", "text/plain") in response.headers + + +def test_etag_does_not_match_if_none_match_header_list(app): + environ = {"REQUEST_METHOD": "GET", "HTTP_IF_NONE_MATCH": '"123","4567","*"'} + response = call_app(app, environ) + assert response.data == b"Hello, world!" + assert response.status == "200 OK" + assert ("Content-Type", "text/plain") in response.headers + + +def test_etag_does_not_match_if_none_match_malformed_header(app): + environ = {"REQUEST_METHOD": "GET", "HTTP_IF_NONE_MATCH": "{ETAG}"} + response = call_app(app, environ) + assert response.data == b"400 Bad Request\n" + assert response.status == "400 Bad Request" + assert ("Content-Type", "text/plain") in response.headers diff --git a/tests/middleware/test_open_database.py b/tests/middleware/test_open_database.py new file mode 100644 index 0000000..43ebb07 --- /dev/null +++ b/tests/middleware/test_open_database.py @@ -0,0 +1,55 @@ +import urllib.parse +import urllib.request +from wsgiref.validate import validator + +import pytest + +from paste import open_database + +from ..common_wsgi import call_app + + +@pytest.fixture +def app(): + @validator + @open_database + @validator + def app(environ, start_response): + assert "paste.db_conn" in environ + conn = environ["paste.db_conn"] + (ver,) = conn.execute("PRAGMA user_version").fetchone() + assert ver > 0 + start_response("200 OK", [("Content-Type", "text/plain")]) + return [b"Hello, World!"] + + return app + + +def test_open_uri_memory(app): + response = call_app(app, environ={"PASTE_DB": "file::memory:"}) + assert response.status == "200 OK" + assert response.data == b"Hello, World!" + assert ("Content-Type", "text/plain") in response.headers + + +def test_open_uri_file(app, tmp_path): + uri = urllib.parse.urlunsplit( + ( + "file", + None, + urllib.request.pathname2url(str(tmp_path / "test.db")), + None, + None, + ) + ) + response = call_app(app, environ={"PASTE_DB": uri}) + assert response.status == "200 OK" + assert response.data == b"Hello, World!" + assert ("Content-Type", "text/plain") in response.headers + + +def test_open_file(app, tmp_path): + response = call_app(app, environ={"PASTE_DB": str(tmp_path / "test.db")}) + assert response.status == "200 OK" + assert response.data == b"Hello, World!" + assert ("Content-Type", "text/plain") in response.headers diff --git a/tests/middleware/test_options.py b/tests/middleware/test_options.py new file mode 100644 index 0000000..8cf48ea --- /dev/null +++ b/tests/middleware/test_options.py @@ -0,0 +1,43 @@ +from wsgiref.validate import validator + +import pytest + +from paste import options + +from ..common_wsgi import call_app + + +@pytest.fixture +def app(): + @validator + @options + @validator + def app(_, start_response): + start_response("200 OK", [("Content-Type", "text/plain")]) + return [b"Hello, world!"] + + return app + + +@pytest.mark.parametrize("method", ["GET", "HEAD", "POST", "PUT", "DELETE"]) +def test_non_options(app, method): + environ = {"REQUEST_METHOD": method} + response = call_app(app, environ) + assert response.data == b"Hello, world!" + assert response.status == "200 OK" + assert ("Content-Type", "text/plain") in response.headers + + +def test_options(app): + environ = {"REQUEST_METHOD": "OPTIONS"} + response = call_app(app, environ) + assert response.data == b"" + assert response.status == "204 No Content" + allow = None + for k, v in response.headers: + if k != "Allow": + continue + allow = v.split(", ") + assert allow is not None, "Must contain an Allow header" + assert len(set(allow)) == len(allow), "Allow header must not contain duplicates" + assert set(allow) == {"GET", "HEAD", "POST", "PUT", "DELETE", "OPTIONS"} diff --git a/tests/middleware/test_validate_method.py b/tests/middleware/test_validate_method.py new file mode 100644 index 0000000..1a51bca --- /dev/null +++ b/tests/middleware/test_validate_method.py @@ -0,0 +1,44 @@ +from wsgiref.validate import validator + +import pytest + +from paste import validate_method + +from ..common_wsgi import call_app + + +@pytest.fixture +def app(): + @validator + @validate_method + @validator + def app(_, start_response): + start_response("200 OK", [("Content-type", "text/plain")]) + return [b"Hello, world!"] + + return app + + +@pytest.mark.parametrize("method", ["GET", "HEAD", "POST", "PUT", "DELETE", "OPTIONS"]) +def test_valid_method(app, method): + environ = {"REQUEST_METHOD": method} + response = call_app(app, environ) + assert response.status == "200 OK" + assert response.data == b"Hello, world!" + + +@pytest.mark.filterwarnings("ignore:Unknown REQUEST_METHOD") +@pytest.mark.parametrize("method", ["CONNECT", "PATCH", "TRACE"]) +def test_disallowed_method(app, method): + environ = {"REQUEST_METHOD": method} + response = call_app(app, environ) + assert response.status == "405 Method Not Allowed" + assert response.data == b"405 Method Not Allowed\n" + + +@pytest.mark.filterwarnings("ignore:Unknown REQUEST_METHOD") +def test_invalid_method(app): + environ = {"REQUEST_METHOD": "INVALID"} + response = call_app(app, environ) + assert response.status == "501 Not Implemented" + assert response.data == b"501 Not Implemented\n" -- cgit v1.2.3-54-g00ecf