aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorTomasz Kramkowski <tomasz@kramkow.ski>2023-03-24 19:45:17 +0000
committerTomasz Kramkowski <tomasz@kramkow.ski>2023-03-24 20:25:01 +0000
commit7bc36edb7b15cae90bbfa81187b63821466cdd59 (patch)
tree0c96857d0e77eee107ed792ddcea286e0fd14816
parentf29abe40fb1eb063ef67651361a2f3f851b25ed1 (diff)
downloadpaste-7bc36edb7b15cae90bbfa81187b63821466cdd59.tar.gz
paste-7bc36edb7b15cae90bbfa81187b63821466cdd59.tar.xz
paste-7bc36edb7b15cae90bbfa81187b63821466cdd59.zip
add middleware tests
-rw-r--r--tests/common_wsgi.py50
-rw-r--r--tests/middleware/__init__.py0
-rw-r--r--tests/middleware/test_authenticate.py116
-rw-r--r--tests/middleware/test_catch_exceptions.py37
-rw-r--r--tests/middleware/test_if_none_match.py138
-rw-r--r--tests/middleware/test_open_database.py55
-rw-r--r--tests/middleware/test_options.py43
-rw-r--r--tests/middleware/test_validate_method.py44
8 files changed, 483 insertions, 0 deletions
diff --git a/tests/common_wsgi.py b/tests/common_wsgi.py
new file mode 100644
index 0000000..21e8deb
--- /dev/null
+++ b/tests/common_wsgi.py
@@ -0,0 +1,50 @@
+from dataclasses import dataclass
+from types import TracebackType
+from typing import Optional, Union
+from wsgiref.util import setup_testing_defaults
+
+
+@dataclass
+class Response:
+ data: bytes
+ status: str
+ headers: list[tuple[str, str]]
+ exc_info: Union[
+ None,
+ tuple[type[BaseException], BaseException, TracebackType],
+ tuple[None, None, None],
+ ]
+
+
+def call_app(app, environ={}):
+ def write(_):
+ raise AssertionError("write was called, but should not have been")
+
+ status = None
+ headers = None
+ exc_info = None
+
+ def start_response(s, h, e_i=None):
+ nonlocal status, headers, exc_info
+ if isinstance(headers, list) and e_i is None:
+ raise AssertionError("Start response called twice")
+ status = s
+ headers = h
+ exc_info = e_i
+ return write
+
+ setup_testing_defaults(environ)
+ if "QUERY_STRING" not in environ:
+ environ["QUERY_STRING"] = ""
+ resp_iter = app(environ, start_response)
+ assert isinstance(status, str)
+ assert isinstance(headers, list)
+ resp = b"".join(resp_iter)
+ if hasattr(resp_iter, "close"):
+ resp_iter.close()
+ return Response(
+ resp,
+ status,
+ headers,
+ exc_info,
+ )
diff --git a/tests/middleware/__init__.py b/tests/middleware/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/tests/middleware/__init__.py
diff --git a/tests/middleware/test_authenticate.py b/tests/middleware/test_authenticate.py
new file mode 100644
index 0000000..28ccef2
--- /dev/null
+++ b/tests/middleware/test_authenticate.py
@@ -0,0 +1,116 @@
+from base64 import b64encode
+from wsgiref.validate import validator
+
+import pytest
+
+import paste
+from paste import authenticate
+
+from ..common_wsgi import call_app
+
+
+@pytest.fixture
+def app():
+ @validator
+ @authenticate
+ @validator
+ def app(_, start_response):
+ start_response("200 OK", [("Content-Type", "text/plain")])
+ return [b"Hello, world!"]
+
+ return app
+
+
+@pytest.mark.parametrize("method", ["GET", "HEAD"])
+def test_unauthenticated_request(app, method, monkeypatch):
+ monkeypatch.delattr(paste, "check_token")
+ environ = {"REQUEST_METHOD": method}
+ response = call_app(app, environ)
+ assert response.data == b"Hello, world!"
+ assert response.status == "200 OK"
+ assert ("Content-Type", "text/plain") in response.headers
+
+
+@pytest.mark.parametrize("method", ["GET", "HEAD"])
+def test_unauthenticated_request_with_key(app, method, monkeypatch):
+ monkeypatch.delattr(paste, "check_token")
+ environ = {
+ "REQUEST_METHOD": method,
+ "paste.db_conn": None,
+ "HTTP_AUTHORIZATION": "ApiKey AAAA",
+ }
+ response = call_app(app, environ)
+ assert response.data == b"Hello, world!"
+ assert response.status == "200 OK"
+ assert ("Content-Type", "text/plain") in response.headers
+
+
+@pytest.mark.parametrize("method", ["POST", "PUT", "DELETE"])
+def test_authenticate_no_header(app, method, monkeypatch):
+ monkeypatch.delattr(paste, "check_token")
+ environ = {"REQUEST_METHOD": method}
+ response = call_app(app, environ)
+ assert response.data == b"401 Unauthorized\n"
+ assert response.status == "401 Unauthorized"
+ assert ("Content-Type", "text/plain") in response.headers
+
+
+@pytest.mark.parametrize("method", ["POST", "PUT", "DELETE"])
+@pytest.mark.parametrize("key", ["ApiKey AAAA", "APIKey AAA", "APIKey AAAA", "AAAA"])
+def test_authenticate_malformed_key(app, method, key, monkeypatch):
+ monkeypatch.delattr(paste, "check_token")
+ environ = {"REQUEST_METHOD": method, "HTTP_AUTHORIZATION": key}
+ response = call_app(app, environ)
+ assert response.data == b"401 Unauthorized\n"
+ assert response.status == "401 Unauthorized"
+ assert ("Content-Type", "text/plain") in response.headers
+
+
+@pytest.mark.parametrize("method", ["POST", "PUT", "DELETE"])
+def test_authenticate_check_token_fail(app, method, monkeypatch):
+ check_token_called = False
+ token = b"test"
+ environ = {
+ "REQUEST_METHOD": method,
+ "paste.db_conn": object(),
+ "HTTP_AUTHORIZATION": f"APIKey {b64encode(token).decode()}",
+ }
+
+ def check_token(conn, tok):
+ nonlocal check_token_called
+ assert conn == environ["paste.db_conn"]
+ assert tok == token
+ check_token_called = True
+ return False
+
+ monkeypatch.setattr(paste, "check_token", check_token)
+ response = call_app(app, environ)
+ assert check_token_called
+ assert response.data == b"401 Unauthorized\n"
+ assert response.status == "401 Unauthorized"
+ assert ("Content-Type", "text/plain") in response.headers
+
+
+@pytest.mark.parametrize("method", ["POST", "PUT", "DELETE"])
+def test_authenticate_check_token_success(app, method, monkeypatch):
+ check_token_called = False
+ token = b"test"
+ environ = {
+ "REQUEST_METHOD": method,
+ "paste.db_conn": object(),
+ "HTTP_AUTHORIZATION": f"APIKey {b64encode(token).decode()}",
+ }
+
+ def check_token(conn, tok):
+ nonlocal check_token_called
+ assert conn == environ["paste.db_conn"]
+ assert tok == token
+ check_token_called = True
+ return True
+
+ monkeypatch.setattr(paste, "check_token", check_token)
+ response = call_app(app, environ)
+ assert check_token_called
+ assert response.data == b"Hello, world!"
+ assert response.status == "200 OK"
+ assert ("Content-Type", "text/plain") in response.headers
diff --git a/tests/middleware/test_catch_exceptions.py b/tests/middleware/test_catch_exceptions.py
new file mode 100644
index 0000000..5042f7f
--- /dev/null
+++ b/tests/middleware/test_catch_exceptions.py
@@ -0,0 +1,37 @@
+from wsgiref.validate import validator
+
+from paste import catch_exceptions
+
+from ..common_wsgi import call_app
+
+
+def test_no_exception():
+ @validator
+ @catch_exceptions
+ @validator
+ def app(_, start_response):
+ start_response("200 OK", [("Content-Type", "text/plain")])
+ return [b"Hello, world!"]
+
+ response = call_app(app)
+ assert response.data == b"Hello, world!"
+ assert response.status == "200 OK"
+ assert ("Content-Type", "text/plain") in response.headers
+
+
+def test_exception(capfd):
+ @validator
+ @catch_exceptions
+ @validator
+ def app(_, start_response):
+ start_response("200 OK", [("Content-Type", "text/plain")])
+ raise Exception("Something went wrong")
+
+ response = call_app(app)
+ assert response.data == b"500 Internal Server Error\n"
+ assert response.status == "500 Internal Server Error"
+ assert ("Content-Type", "text/plain") in response.headers
+ out, _ = capfd.readouterr()
+ assert "Exception: Something went wrong\n" in out
+ assert response.exc_info is not None
+ assert isinstance(response.exc_info[1], Exception)
diff --git a/tests/middleware/test_if_none_match.py b/tests/middleware/test_if_none_match.py
new file mode 100644
index 0000000..8456c05
--- /dev/null
+++ b/tests/middleware/test_if_none_match.py
@@ -0,0 +1,138 @@
+from wsgiref.validate import validator
+
+import pytest
+
+from paste import if_none_match
+
+from ..common_wsgi import call_app
+
+ETAG = "1234"
+
+
+@pytest.fixture
+def app():
+ @validator
+ @if_none_match
+ @validator
+ def app(environ, start_response):
+ body = b"Hello, world!"
+ etag_header = ("ETag", f'"{ETAG}"')
+ if environ["REQUEST_METHOD"] == "HEAD":
+ start_response(
+ "200 OK",
+ [
+ ("Content-Type", "text/plain"),
+ ("Content-Length", str(len(body))),
+ etag_header,
+ ],
+ )
+ return []
+ start_response("200 OK", [("Content-Type", "text/plain"), etag_header])
+ return [body]
+
+ return app
+
+
+@pytest.mark.parametrize("method", ["POST", "PUT", "DELETE"])
+def test_non_get_or_head_request_passthrough(app, method):
+ environ = {"REQUEST_METHOD": method, "HTTP_IF_NONE_MATCH": f'"{ETAG}"'}
+ response = call_app(app, environ)
+ assert response.data == b"Hello, world!"
+ assert response.status == "200 OK"
+ assert ("Content-Type", "text/plain") in response.headers
+
+
+def test_if_none_match_header_not_set(app):
+ environ = {"REQUEST_METHOD": "GET"}
+ response = call_app(app, environ)
+ assert response.data == b"Hello, world!"
+ assert response.status == "200 OK"
+ assert ("Content-Type", "text/plain") in response.headers
+
+
+@pytest.mark.parametrize("value", [f'"{ETAG}"', f'W/"{ETAG}"'])
+def test_etag_matches_if_none_match_header(app, value):
+ environ = {"REQUEST_METHOD": "GET", "HTTP_IF_NONE_MATCH": value}
+ response = call_app(app, environ)
+ assert response.data == b""
+ assert response.status == "304 Not Modified"
+
+
+@pytest.mark.parametrize(
+ "value",
+ [
+ f'"{ETAG}", "54321"',
+ f'"54321", "{ETAG}", "098765"',
+ f'"54321", "{ETAG}"',
+ f'W/"{ETAG}", "54321"',
+ f'"54321", W/"{ETAG}", "098765"',
+ f'"54321", W/"{ETAG}"',
+ ],
+)
+def test_etag_matches_if_none_match_header_list(app, value):
+ environ = {"REQUEST_METHOD": "GET", "HTTP_IF_NONE_MATCH": value}
+ response = call_app(app, environ)
+ assert response.data == b""
+ assert response.status == "304 Not Modified"
+
+
+def test_etag_matches_if_none_match_header_list2(app):
+ environ = {"REQUEST_METHOD": "GET", "HTTP_IF_NONE_MATCH": f'"54321", "{ETAG}"'}
+ response = call_app(app, environ)
+ assert response.data == b""
+ assert response.status == "304 Not Modified"
+
+
+def test_etag_matches_if_none_match_header_star(app):
+ environ = {"REQUEST_METHOD": "GET", "HTTP_IF_NONE_MATCH": "*"}
+ response = call_app(app, environ)
+ assert response.data == b""
+ assert response.status == "304 Not Modified"
+
+
+def test_missing_etag_does_not_match_if_none_match_header_star():
+ @validator
+ @if_none_match
+ @validator
+ def app(environ, start_response):
+ body = b"Hello, world!"
+ if environ["REQUEST_METHOD"] == "HEAD":
+ start_response(
+ "200 OK",
+ [
+ ("Content-Type", "text/plain"),
+ ("Content-Length", str(len(body))),
+ ],
+ )
+ return []
+ start_response("200 OK", [("Content-Type", "text/plain")])
+ return [body]
+
+ environ = {"REQUEST_METHOD": "GET", "HTTP_IF_NONE_MATCH": "*"}
+ response = call_app(app, environ)
+ assert response.data == b"Hello, world!"
+ assert response.status == "200 OK"
+
+
+def test_etag_does_not_match_if_none_match_header(app):
+ environ = {"REQUEST_METHOD": "GET", "HTTP_IF_NONE_MATCH": '"54321"'}
+ response = call_app(app, environ)
+ assert response.data == b"Hello, world!"
+ assert response.status == "200 OK"
+ assert ("Content-Type", "text/plain") in response.headers
+
+
+def test_etag_does_not_match_if_none_match_header_list(app):
+ environ = {"REQUEST_METHOD": "GET", "HTTP_IF_NONE_MATCH": '"123","4567","*"'}
+ response = call_app(app, environ)
+ assert response.data == b"Hello, world!"
+ assert response.status == "200 OK"
+ assert ("Content-Type", "text/plain") in response.headers
+
+
+def test_etag_does_not_match_if_none_match_malformed_header(app):
+ environ = {"REQUEST_METHOD": "GET", "HTTP_IF_NONE_MATCH": "{ETAG}"}
+ response = call_app(app, environ)
+ assert response.data == b"400 Bad Request\n"
+ assert response.status == "400 Bad Request"
+ assert ("Content-Type", "text/plain") in response.headers
diff --git a/tests/middleware/test_open_database.py b/tests/middleware/test_open_database.py
new file mode 100644
index 0000000..43ebb07
--- /dev/null
+++ b/tests/middleware/test_open_database.py
@@ -0,0 +1,55 @@
+import urllib.parse
+import urllib.request
+from wsgiref.validate import validator
+
+import pytest
+
+from paste import open_database
+
+from ..common_wsgi import call_app
+
+
+@pytest.fixture
+def app():
+ @validator
+ @open_database
+ @validator
+ def app(environ, start_response):
+ assert "paste.db_conn" in environ
+ conn = environ["paste.db_conn"]
+ (ver,) = conn.execute("PRAGMA user_version").fetchone()
+ assert ver > 0
+ start_response("200 OK", [("Content-Type", "text/plain")])
+ return [b"Hello, World!"]
+
+ return app
+
+
+def test_open_uri_memory(app):
+ response = call_app(app, environ={"PASTE_DB": "file::memory:"})
+ assert response.status == "200 OK"
+ assert response.data == b"Hello, World!"
+ assert ("Content-Type", "text/plain") in response.headers
+
+
+def test_open_uri_file(app, tmp_path):
+ uri = urllib.parse.urlunsplit(
+ (
+ "file",
+ None,
+ urllib.request.pathname2url(str(tmp_path / "test.db")),
+ None,
+ None,
+ )
+ )
+ response = call_app(app, environ={"PASTE_DB": uri})
+ assert response.status == "200 OK"
+ assert response.data == b"Hello, World!"
+ assert ("Content-Type", "text/plain") in response.headers
+
+
+def test_open_file(app, tmp_path):
+ response = call_app(app, environ={"PASTE_DB": str(tmp_path / "test.db")})
+ assert response.status == "200 OK"
+ assert response.data == b"Hello, World!"
+ assert ("Content-Type", "text/plain") in response.headers
diff --git a/tests/middleware/test_options.py b/tests/middleware/test_options.py
new file mode 100644
index 0000000..8cf48ea
--- /dev/null
+++ b/tests/middleware/test_options.py
@@ -0,0 +1,43 @@
+from wsgiref.validate import validator
+
+import pytest
+
+from paste import options
+
+from ..common_wsgi import call_app
+
+
+@pytest.fixture
+def app():
+ @validator
+ @options
+ @validator
+ def app(_, start_response):
+ start_response("200 OK", [("Content-Type", "text/plain")])
+ return [b"Hello, world!"]
+
+ return app
+
+
+@pytest.mark.parametrize("method", ["GET", "HEAD", "POST", "PUT", "DELETE"])
+def test_non_options(app, method):
+ environ = {"REQUEST_METHOD": method}
+ response = call_app(app, environ)
+ assert response.data == b"Hello, world!"
+ assert response.status == "200 OK"
+ assert ("Content-Type", "text/plain") in response.headers
+
+
+def test_options(app):
+ environ = {"REQUEST_METHOD": "OPTIONS"}
+ response = call_app(app, environ)
+ assert response.data == b""
+ assert response.status == "204 No Content"
+ allow = None
+ for k, v in response.headers:
+ if k != "Allow":
+ continue
+ allow = v.split(", ")
+ assert allow is not None, "Must contain an Allow header"
+ assert len(set(allow)) == len(allow), "Allow header must not contain duplicates"
+ assert set(allow) == {"GET", "HEAD", "POST", "PUT", "DELETE", "OPTIONS"}
diff --git a/tests/middleware/test_validate_method.py b/tests/middleware/test_validate_method.py
new file mode 100644
index 0000000..1a51bca
--- /dev/null
+++ b/tests/middleware/test_validate_method.py
@@ -0,0 +1,44 @@
+from wsgiref.validate import validator
+
+import pytest
+
+from paste import validate_method
+
+from ..common_wsgi import call_app
+
+
+@pytest.fixture
+def app():
+ @validator
+ @validate_method
+ @validator
+ def app(_, start_response):
+ start_response("200 OK", [("Content-type", "text/plain")])
+ return [b"Hello, world!"]
+
+ return app
+
+
+@pytest.mark.parametrize("method", ["GET", "HEAD", "POST", "PUT", "DELETE", "OPTIONS"])
+def test_valid_method(app, method):
+ environ = {"REQUEST_METHOD": method}
+ response = call_app(app, environ)
+ assert response.status == "200 OK"
+ assert response.data == b"Hello, world!"
+
+
+@pytest.mark.filterwarnings("ignore:Unknown REQUEST_METHOD")
+@pytest.mark.parametrize("method", ["CONNECT", "PATCH", "TRACE"])
+def test_disallowed_method(app, method):
+ environ = {"REQUEST_METHOD": method}
+ response = call_app(app, environ)
+ assert response.status == "405 Method Not Allowed"
+ assert response.data == b"405 Method Not Allowed\n"
+
+
+@pytest.mark.filterwarnings("ignore:Unknown REQUEST_METHOD")
+def test_invalid_method(app):
+ environ = {"REQUEST_METHOD": "INVALID"}
+ response = call_app(app, environ)
+ assert response.status == "501 Not Implemented"
+ assert response.data == b"501 Not Implemented\n"