aboutsummaryrefslogtreecommitdiffstats
path: root/paste/__main__.py
diff options
context:
space:
mode:
authorTomasz Kramkowski <tomasz@kramkow.ski>2023-02-14 18:36:54 +0000
committerTomasz Kramkowski <tomasz@kramkow.ski>2023-02-14 18:36:54 +0000
commitac0b1401574711a26d494a80c03807e422cc0853 (patch)
tree19d1440d05c6448e8d5065c6aa4270747e418020 /paste/__main__.py
parent0d143bb226e702006d6929c346b8d25a05e10b2e (diff)
downloadpaste-ac0b1401574711a26d494a80c03807e422cc0853.tar.gz
paste-ac0b1401574711a26d494a80c03807e422cc0853.tar.xz
paste-ac0b1401574711a26d494a80c03807e422cc0853.zip
First draft version
Diffstat (limited to 'paste/__main__.py')
-rw-r--r--paste/__main__.py140
1 files changed, 140 insertions, 0 deletions
diff --git a/paste/__main__.py b/paste/__main__.py
new file mode 100644
index 0000000..f1aa117
--- /dev/null
+++ b/paste/__main__.py
@@ -0,0 +1,140 @@
+from base64 import b64encode, b64decode
+from datetime import datetime, timezone
+from hashlib import sha256
+from os import getenv
+from secrets import token_bytes
+from sys import argv, stderr
+from wsgiref.simple_server import make_server
+from sqlite3 import Connection
+from contextlib import AbstractContextManager
+
+from . import application, DB_PATH
+from . import db
+
+TOKEN_BYTES = 96 // 8
+PROGRAM_NAME = "paste"
+
+
+def print_usage() -> None:
+ print(
+ f"Usage: {PROGRAM_NAME} [-h|serve|new-token|list-tokens|delete-token]",
+ file=stderr,
+ )
+
+
+def print_help() -> None:
+ print(
+ f"""A simple WSGI paste site
+
+Commands:
+ serve (default) Start a basic HTTP server (do NOT use in production)
+ new-token Generate a new access token
+ list-tokens List access token hashes (sha256 hash, created at)
+ check-token Check if a token is valid and exists in the database
+ delete-token Delete a token (specify token or hash)
+
+Options:
+ -h Show this help
+
+Environment:
+ PASTE_HOST The HTTP server host (default: localhost)
+ PASTE_PORT The HTTP server port (default: 8080)
+ PASTE_DB The path to the sqlite3 database (default: {DB_PATH})""",
+ file=stderr,
+ )
+
+
+def generate_token(conn: Connection):
+ token = token_bytes(TOKEN_BYTES)
+ token_hash = sha256(token).digest()
+ with conn:
+ conn.execute("INSERT INTO token (hash) VALUES (?)", (token_hash,))
+ return token
+
+
+def get_tokens(conn: Connection):
+ return conn.execute(
+ "SELECT hash, created_at FROM token ORDER BY created_at"
+ ).fetchall()
+
+
+def delete_token_hash(conn: Connection, token_hash: bytes):
+ if len(token_hash) != 256 // 8:
+ raise ValueError("Invalid token hash")
+ with conn:
+ cur = conn.execute("DELETE FROM token WHERE hash = ?", (token_hash,))
+ if cur.rowcount <= 0:
+ raise KeyError("Token hash does not exist")
+
+
+def delete_token(conn: Connection, token: bytes):
+ if len(token) != TOKEN_BYTES:
+ raise ValueError("Invalid token")
+ return delete_token_hash(conn, sha256(token).digest())
+
+
+def check_token(conn: Connection, token: bytes):
+ if len(token) != TOKEN_BYTES:
+ raise ValueError("Invalid token")
+ token_hash = sha256(token).digest()
+ (count,) = conn.execute(
+ "SELECT COUNT(*) FROM token WHERE hash = ?", (token_hash,)
+ ).fetchone()
+ return count > 0
+
+
+db_path = getenv("PASTE_DB", DB_PATH)
+
+
+def connect() -> AbstractContextManager[Connection]:
+ return db.connect(db_path)
+
+
+def main():
+ if len(argv) <= 1 or len(argv) == 2 and argv[1] == "serve":
+ host = getenv("PASTE_HOST", "localhost")
+ port = int(getenv("PASTE_PORT", "8080"))
+ httpd = make_server(host, port, application)
+ httpd.serve_forever()
+ elif len(argv) == 2:
+ if argv[1] == "new-token":
+ with connect() as conn:
+ print(b64encode(generate_token(conn)).decode())
+ elif argv[1] == "list-tokens":
+ with connect() as conn:
+ for token_hash, created_at in get_tokens(conn):
+ created_at = datetime.fromtimestamp(created_at, timezone.utc)
+ print(f"{token_hash.hex()}\t{created_at.ctime()}")
+ elif argv[1] == "-h" or argv[1] == "--help":
+ print_usage()
+ print_help()
+ else:
+ print_usage()
+ exit(1)
+ elif len(argv) == 3:
+ if argv[1] == "delete-token":
+ token = argv[2]
+ with connect() as conn:
+ try:
+ try:
+ delete_token(conn, b64decode(token))
+ except ValueError:
+ delete_token_hash(conn, bytes.fromhex(token))
+ except ValueError:
+ print("Malformed token", file=stderr)
+ exit(1)
+ except KeyError:
+ print("Token not found", file=stderr)
+ exit(1)
+ elif argv[1] == "verify-token":
+ with connect() as conn:
+ try:
+ if not check_token(conn, b64decode(argv[2])):
+ print("Token not found", file=stderr)
+ exit(1)
+ print("Found")
+ except ValueError:
+ print("Malformed token", file=stderr)
+
+
+main()