518 lines
18 KiB
Python
518 lines
18 KiB
Python
from fastapi import FastAPI, UploadFile, File, HTTPException
|
|
from html.parser import HTMLParser
|
|
from fastapi.responses import JSONResponse, HTMLResponse
|
|
import json as json_lib
|
|
from fastapi.middleware.cors import CORSMiddleware
|
|
from pydantic import BaseModel
|
|
from typing import Optional, List
|
|
import aiosqlite
|
|
import shutil
|
|
import os
|
|
import re as re_mod
|
|
import ipaddress
|
|
import httpx
|
|
|
|
app = FastAPI()
|
|
|
|
app.add_middleware(
|
|
CORSMiddleware,
|
|
allow_origins=["*"],
|
|
allow_methods=["*"],
|
|
allow_headers=["*"],
|
|
)
|
|
|
|
DB_PATH = "/data/startpage.db"
|
|
FAVICON_DIR = "/data/favicons"
|
|
os.makedirs(FAVICON_DIR, exist_ok=True)
|
|
|
|
class CategoryIn(BaseModel):
|
|
name: str
|
|
icon: Optional[str] = None
|
|
position: Optional[int] = 0
|
|
|
|
class SubcategoryIn(BaseModel):
|
|
category_id: int
|
|
name: str
|
|
position: Optional[int] = 0
|
|
|
|
class LinkIn(BaseModel):
|
|
subcategory_id: int
|
|
label: str
|
|
url: str
|
|
favicon: Optional[str] = None
|
|
position: Optional[int] = 0
|
|
|
|
class ReorderIn(BaseModel):
|
|
type: str # "categories", "subcategories", "links"
|
|
ids: List[int]
|
|
|
|
async def init_db():
|
|
async with aiosqlite.connect(DB_PATH) as db:
|
|
await db.execute("""
|
|
CREATE TABLE IF NOT EXISTS categories (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
name TEXT NOT NULL,
|
|
icon TEXT,
|
|
position INTEGER DEFAULT 0
|
|
)
|
|
""")
|
|
await db.execute("""
|
|
CREATE TABLE IF NOT EXISTS subcategories (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
category_id INTEGER NOT NULL,
|
|
name TEXT NOT NULL,
|
|
position INTEGER DEFAULT 0,
|
|
FOREIGN KEY (category_id) REFERENCES categories(id) ON DELETE CASCADE
|
|
)
|
|
""")
|
|
await db.execute("""
|
|
CREATE TABLE IF NOT EXISTS links (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
subcategory_id INTEGER NOT NULL,
|
|
label TEXT NOT NULL,
|
|
url TEXT NOT NULL,
|
|
favicon TEXT,
|
|
position INTEGER DEFAULT 0,
|
|
FOREIGN KEY (subcategory_id) REFERENCES subcategories(id) ON DELETE CASCADE
|
|
)
|
|
""")
|
|
await db.execute("PRAGMA foreign_keys = ON")
|
|
await db.commit()
|
|
|
|
@app.on_event("startup")
|
|
async def startup():
|
|
await init_db()
|
|
|
|
@app.get("/categories")
|
|
async def get_categories():
|
|
async with aiosqlite.connect(DB_PATH) as db:
|
|
db.row_factory = aiosqlite.Row
|
|
async with db.execute("SELECT * FROM categories ORDER BY position") as cursor:
|
|
rows = await cursor.fetchall()
|
|
return [dict(row) for row in rows]
|
|
|
|
@app.post("/categories")
|
|
async def create_category(cat: CategoryIn):
|
|
async with aiosqlite.connect(DB_PATH) as db:
|
|
cursor = await db.execute(
|
|
"INSERT INTO categories (name, icon, position) VALUES (?, ?, ?)",
|
|
(cat.name, cat.icon, cat.position)
|
|
)
|
|
await db.commit()
|
|
return {"id": cursor.lastrowid, **cat.dict()}
|
|
|
|
@app.put("/categories/{cat_id}")
|
|
async def update_category(cat_id: int, cat: CategoryIn):
|
|
async with aiosqlite.connect(DB_PATH) as db:
|
|
await db.execute(
|
|
"UPDATE categories SET name=?, icon=?, position=? WHERE id=?",
|
|
(cat.name, cat.icon, cat.position, cat_id)
|
|
)
|
|
await db.commit()
|
|
return {"id": cat_id, **cat.dict()}
|
|
|
|
@app.delete("/categories/{cat_id}")
|
|
async def delete_category(cat_id: int):
|
|
async with aiosqlite.connect(DB_PATH) as db:
|
|
await db.execute("PRAGMA foreign_keys = ON")
|
|
await db.execute("DELETE FROM categories WHERE id=?", (cat_id,))
|
|
await db.commit()
|
|
return {"deleted": cat_id}
|
|
|
|
@app.get("/subcategories/{cat_id}")
|
|
async def get_subcategories(cat_id: int):
|
|
async with aiosqlite.connect(DB_PATH) as db:
|
|
db.row_factory = aiosqlite.Row
|
|
async with db.execute(
|
|
"SELECT * FROM subcategories WHERE category_id=? ORDER BY position",
|
|
(cat_id,)
|
|
) as cursor:
|
|
rows = await cursor.fetchall()
|
|
return [dict(row) for row in rows]
|
|
|
|
@app.post("/subcategories")
|
|
async def create_subcategory(sub: SubcategoryIn):
|
|
async with aiosqlite.connect(DB_PATH) as db:
|
|
cursor = await db.execute(
|
|
"INSERT INTO subcategories (category_id, name, position) VALUES (?, ?, ?)",
|
|
(sub.category_id, sub.name, sub.position)
|
|
)
|
|
await db.commit()
|
|
return {"id": cursor.lastrowid, **sub.dict()}
|
|
|
|
@app.put("/subcategories/{sub_id}")
|
|
async def update_subcategory(sub_id: int, sub: SubcategoryIn):
|
|
async with aiosqlite.connect(DB_PATH) as db:
|
|
await db.execute(
|
|
"UPDATE subcategories SET name=?, position=? WHERE id=?",
|
|
(sub.name, sub.position, sub_id)
|
|
)
|
|
await db.commit()
|
|
return {"id": sub_id, **sub.dict()}
|
|
|
|
@app.delete("/subcategories/{sub_id}")
|
|
async def delete_subcategory(sub_id: int):
|
|
async with aiosqlite.connect(DB_PATH) as db:
|
|
await db.execute("PRAGMA foreign_keys = ON")
|
|
await db.execute("DELETE FROM subcategories WHERE id=?", (sub_id,))
|
|
await db.commit()
|
|
return {"deleted": sub_id}
|
|
|
|
@app.get("/links/{sub_id}")
|
|
async def get_links(sub_id: int):
|
|
async with aiosqlite.connect(DB_PATH) as db:
|
|
db.row_factory = aiosqlite.Row
|
|
async with db.execute(
|
|
"SELECT * FROM links WHERE subcategory_id=? ORDER BY position",
|
|
(sub_id,)
|
|
) as cursor:
|
|
rows = await cursor.fetchall()
|
|
return [dict(row) for row in rows]
|
|
|
|
@app.post("/links")
|
|
async def create_link(link: LinkIn):
|
|
async with aiosqlite.connect(DB_PATH) as db:
|
|
cursor = await db.execute(
|
|
"INSERT INTO links (subcategory_id, label, url, favicon, position) VALUES (?, ?, ?, ?, ?)",
|
|
(link.subcategory_id, link.label, link.url, link.favicon, link.position)
|
|
)
|
|
await db.commit()
|
|
return {"id": cursor.lastrowid, **link.dict()}
|
|
|
|
@app.put("/links/{link_id}")
|
|
async def update_link(link_id: int, link: LinkIn):
|
|
async with aiosqlite.connect(DB_PATH) as db:
|
|
await db.execute(
|
|
"UPDATE links SET subcategory_id=?, label=?, url=?, favicon=?, position=? WHERE id=?",
|
|
(link.subcategory_id, link.label, link.url, link.favicon, link.position, link_id)
|
|
)
|
|
await db.commit()
|
|
return {"id": link_id, **link.dict()}
|
|
|
|
@app.delete("/links/{link_id}")
|
|
async def delete_link(link_id: int):
|
|
async with aiosqlite.connect(DB_PATH) as db:
|
|
await db.execute("DELETE FROM links WHERE id=?", (link_id,))
|
|
await db.commit()
|
|
return {"deleted": link_id}
|
|
|
|
@app.put("/reorder")
|
|
async def reorder(data: ReorderIn):
|
|
table_map = {
|
|
"categories": "categories",
|
|
"subcategories": "subcategories",
|
|
"links": "links",
|
|
}
|
|
if data.type not in table_map:
|
|
raise HTTPException(status_code=400, detail="Invalid type")
|
|
table = table_map[data.type]
|
|
async with aiosqlite.connect(DB_PATH) as db:
|
|
for position, item_id in enumerate(data.ids):
|
|
await db.execute(
|
|
f"UPDATE {table} SET position=? WHERE id=?",
|
|
(position, item_id)
|
|
)
|
|
await db.commit()
|
|
return {"reordered": data.type, "count": len(data.ids)}
|
|
|
|
@app.post("/favicons/upload")
|
|
async def upload_favicon(file: UploadFile = File(...)):
|
|
filename = file.filename.replace(" ", "_").lower()
|
|
dest = os.path.join(FAVICON_DIR, filename)
|
|
with open(dest, "wb") as f:
|
|
shutil.copyfileobj(file.file, f)
|
|
return {"favicon": filename}
|
|
|
|
|
|
@app.get("/export/json")
|
|
async def export_json():
|
|
async with aiosqlite.connect(DB_PATH) as db:
|
|
db.row_factory = aiosqlite.Row
|
|
async with db.execute("SELECT * FROM categories ORDER BY position") as cur:
|
|
categories = [dict(r) for r in await cur.fetchall()]
|
|
for cat in categories:
|
|
async with db.execute(
|
|
"SELECT * FROM subcategories WHERE category_id=? ORDER BY position",
|
|
(cat["id"],)
|
|
) as cur:
|
|
subs = [dict(r) for r in await cur.fetchall()]
|
|
for sub in subs:
|
|
async with db.execute(
|
|
"SELECT * FROM links WHERE subcategory_id=? ORDER BY position",
|
|
(sub["id"],)
|
|
) as cur:
|
|
sub["links"] = [dict(r) for r in await cur.fetchall()]
|
|
cat["subcategories"] = subs
|
|
headers = {"Content-Disposition": "attachment; filename=startpage-bookmarks.json"}
|
|
return JSONResponse(content=categories, headers=headers)
|
|
|
|
@app.get("/export/html")
|
|
async def export_html():
|
|
async with aiosqlite.connect(DB_PATH) as db:
|
|
db.row_factory = aiosqlite.Row
|
|
async with db.execute("SELECT * FROM categories ORDER BY position") as cur:
|
|
categories = [dict(r) for r in await cur.fetchall()]
|
|
for cat in categories:
|
|
async with db.execute(
|
|
"SELECT * FROM subcategories WHERE category_id=? ORDER BY position",
|
|
(cat["id"],)
|
|
) as cur:
|
|
subs = [dict(r) for r in await cur.fetchall()]
|
|
for sub in subs:
|
|
async with db.execute(
|
|
"SELECT * FROM links WHERE subcategory_id=? ORDER BY position",
|
|
(sub["id"],)
|
|
) as cur:
|
|
sub["links"] = [dict(r) for r in await cur.fetchall()]
|
|
cat["subcategories"] = subs
|
|
|
|
lines = [
|
|
"<!DOCTYPE NETSCAPE-Bookmark-file-1>",
|
|
"<!-- This is an automatically generated file. -->",
|
|
"<META HTTP-EQUIV=\"Content-Type\" CONTENT=\"text/html; charset=UTF-8\">",
|
|
"<TITLE>Bookmarks</TITLE>",
|
|
"<H1>Bookmarks</H1>",
|
|
"<DL><p>",
|
|
]
|
|
for cat in categories:
|
|
lines.append(" <DT><H3>" + cat["name"] + "</H3>")
|
|
lines.append(" <DL><p>")
|
|
for sub in cat["subcategories"]:
|
|
lines.append(" <DT><H3>" + sub["name"] + "</H3>")
|
|
lines.append(" <DL><p>")
|
|
for link in sub["links"]:
|
|
lines.append(" <DT><A HREF=\"" + link["url"] + "\">" + link["label"] + "</A>")
|
|
lines.append(" </DL><p>")
|
|
lines.append(" </DL><p>")
|
|
lines.append("</DL>")
|
|
|
|
body = "\n".join(lines)
|
|
headers = {"Content-Disposition": "attachment; filename=startpage-bookmarks.html"}
|
|
return HTMLResponse(content=body, headers=headers)
|
|
|
|
|
|
class BookmarkParser(HTMLParser):
|
|
def __init__(self):
|
|
super().__init__()
|
|
self.categories = []
|
|
self._cat = None
|
|
self._sub = None
|
|
self._depth = 0
|
|
self._in_h3 = False
|
|
self._in_a = False
|
|
self._current_href = None
|
|
self._dl_depth = 0
|
|
self._cat_dl_depth = None
|
|
self._sub_dl_depth = None
|
|
|
|
def handle_starttag(self, tag, attrs):
|
|
attrs = dict(attrs)
|
|
if tag == 'dl':
|
|
self._dl_depth += 1
|
|
elif tag == 'h3':
|
|
self._in_h3 = True
|
|
elif tag == 'a':
|
|
self._in_a = True
|
|
self._current_href = attrs.get('href', '')
|
|
|
|
def handle_endtag(self, tag):
|
|
if tag == 'dl':
|
|
if self._cat_dl_depth is not None and self._dl_depth == self._cat_dl_depth:
|
|
self._cat = None
|
|
self._cat_dl_depth = None
|
|
self._sub_dl_depth = None
|
|
elif self._sub_dl_depth is not None and self._dl_depth == self._sub_dl_depth:
|
|
self._sub = None
|
|
self._sub_dl_depth = None
|
|
self._dl_depth -= 1
|
|
elif tag == 'h3':
|
|
self._in_h3 = False
|
|
elif tag == 'a':
|
|
self._in_a = False
|
|
self._current_href = None
|
|
|
|
def handle_data(self, data):
|
|
data = data.strip()
|
|
if not data:
|
|
return
|
|
if self._in_h3:
|
|
if self._cat is None:
|
|
self._cat = {"name": data, "subcategories": []}
|
|
self.categories.append(self._cat)
|
|
self._cat_dl_depth = self._dl_depth + 1
|
|
else:
|
|
self._sub = {"name": data, "links": []}
|
|
self._cat["subcategories"].append(self._sub)
|
|
self._sub_dl_depth = self._dl_depth + 1
|
|
elif self._in_a and self._current_href:
|
|
if self._sub is None:
|
|
if not self._cat["subcategories"] or self._cat["subcategories"][-1]["name"] != "General":
|
|
self._sub = {"name": "General", "links": []}
|
|
self._cat["subcategories"].append(self._sub)
|
|
else:
|
|
self._sub = self._cat["subcategories"][-1]
|
|
self._sub["links"].append({"label": data, "url": self._current_href})
|
|
|
|
|
|
@app.post("/import/html")
|
|
async def import_html(file: UploadFile = File(...)):
|
|
content = (await file.read()).decode("utf-8", errors="replace")
|
|
parser = BookmarkParser()
|
|
parser.feed(content)
|
|
|
|
imported_cats = 0
|
|
imported_subs = 0
|
|
imported_links = 0
|
|
|
|
async with aiosqlite.connect(DB_PATH) as db:
|
|
await db.execute("PRAGMA foreign_keys = ON")
|
|
|
|
async with db.execute("SELECT COALESCE(MAX(position), -1) FROM categories") as cur:
|
|
row = await cur.fetchone()
|
|
cat_pos = (row[0] or -1) + 1
|
|
|
|
for cat_data in parser.categories:
|
|
cursor = await db.execute(
|
|
"INSERT INTO categories (name, icon, position) VALUES (?, ?, ?)",
|
|
(cat_data["name"], "ti-folder", cat_pos)
|
|
)
|
|
cat_id = cursor.lastrowid
|
|
cat_pos += 1
|
|
imported_cats += 1
|
|
|
|
sub_pos = 0
|
|
for sub_data in cat_data["subcategories"]:
|
|
cursor = await db.execute(
|
|
"INSERT INTO subcategories (category_id, name, position) VALUES (?, ?, ?)",
|
|
(cat_id, sub_data["name"], sub_pos)
|
|
)
|
|
sub_id = cursor.lastrowid
|
|
sub_pos += 1
|
|
imported_subs += 1
|
|
|
|
for link_pos, link_data in enumerate(sub_data["links"]):
|
|
await db.execute(
|
|
"INSERT INTO links (subcategory_id, label, url, favicon, position) VALUES (?, ?, ?, ?, ?)",
|
|
(sub_id, link_data["label"], link_data["url"], None, link_pos)
|
|
)
|
|
imported_links += 1
|
|
|
|
await db.commit()
|
|
|
|
return {
|
|
"imported": {
|
|
"categories": imported_cats,
|
|
"subcategories": imported_subs,
|
|
"links": imported_links
|
|
}
|
|
}
|
|
|
|
|
|
class IconLinkParser(HTMLParser):
|
|
def __init__(self):
|
|
super().__init__()
|
|
self.icons = []
|
|
|
|
def handle_starttag(self, tag, attrs):
|
|
if tag != 'link':
|
|
return
|
|
attrs = dict(attrs)
|
|
rel = attrs.get('rel', '').lower()
|
|
if 'icon' in rel:
|
|
href = attrs.get('href', '').strip()
|
|
sizes = attrs.get('sizes', '')
|
|
if href:
|
|
self.icons.append({'href': href, 'sizes': sizes, 'rel': rel})
|
|
|
|
|
|
def _is_private_url(url: str) -> bool:
|
|
"""Return True for Tailscale, .local, or private IP URLs — skip fetch for these."""
|
|
if '.ts.net' in url or '.local' in url:
|
|
return True
|
|
try:
|
|
from urllib.parse import urlparse
|
|
host = urlparse(url).hostname or ''
|
|
addr = ipaddress.ip_address(host)
|
|
return addr.is_private or addr.is_loopback
|
|
except Exception:
|
|
return False
|
|
|
|
|
|
def _pick_best_icon(icons: list, base_url: str) -> str | None:
|
|
"""Pick highest-res icon, resolve relative URLs, return absolute URL or None."""
|
|
from urllib.parse import urljoin, urlparse
|
|
if not icons:
|
|
return None
|
|
# Prefer explicit sizes, larger first
|
|
def size_score(icon):
|
|
s = icon.get('sizes', '')
|
|
m = re_mod.search(r'(\d+)', s)
|
|
return int(m.group(1)) if m else 0
|
|
icons_sorted = sorted(icons, key=size_score, reverse=True)
|
|
href = icons_sorted[0]['href']
|
|
if href.startswith('data:'):
|
|
return None
|
|
return urljoin(base_url, href)
|
|
|
|
|
|
def _domain_filename(url: str) -> str:
|
|
from urllib.parse import urlparse
|
|
host = urlparse(url).hostname or 'unknown'
|
|
# strip www.
|
|
host = re_mod.sub(r'^www\.', '', host)
|
|
return host + '.png'
|
|
|
|
|
|
@app.post("/favicons/fetch")
|
|
async def fetch_favicon(payload: dict):
|
|
from urllib.parse import urlparse, urljoin
|
|
url = payload.get('url', '').strip()
|
|
if not url:
|
|
return {"favicon": None}
|
|
if _is_private_url(url):
|
|
return {"favicon": None}
|
|
|
|
filename = _domain_filename(url)
|
|
dest = os.path.join(FAVICON_DIR, filename)
|
|
|
|
# If already cached, return immediately
|
|
if os.path.exists(dest):
|
|
return {"favicon": filename}
|
|
|
|
parsed = urlparse(url)
|
|
base_url = f"{parsed.scheme}://{parsed.netloc}"
|
|
icon_url = None
|
|
|
|
headers = {"User-Agent": "Mozilla/5.0 (compatible; favicon-fetcher/1.0)"}
|
|
try:
|
|
async with httpx.AsyncClient(follow_redirects=True, headers=headers) as client:
|
|
# Try to parse the page for <link rel="icon"> — short timeout
|
|
try:
|
|
resp = await client.get(url, timeout=5)
|
|
if resp.status_code == 200:
|
|
content_type = resp.headers.get('content-type', '')
|
|
if 'html' in content_type:
|
|
parser = IconLinkParser()
|
|
parser.feed(resp.text[:20000])
|
|
icon_url = _pick_best_icon(parser.icons, base_url)
|
|
except Exception:
|
|
pass
|
|
|
|
# Fall back to /favicon.ico
|
|
if not icon_url:
|
|
icon_url = base_url + '/favicon.ico'
|
|
|
|
# Download the icon — separate timeout
|
|
try:
|
|
icon_resp = await client.get(icon_url, timeout=8)
|
|
if icon_resp.status_code == 200 and len(icon_resp.content) > 0:
|
|
with open(dest, 'wb') as f:
|
|
f.write(icon_resp.content)
|
|
return {"favicon": filename}
|
|
except Exception:
|
|
pass
|
|
|
|
except Exception:
|
|
pass
|
|
|
|
return {"favicon": None}
|