Flask + React web UI with audio player, podcast queue, feed management, episode browser, music library, schedule viewer, and log tail. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
448 lines
14 KiB
Python
448 lines
14 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
RSS Feed Poller for Local Radio Station.
|
|
|
|
Polls configured RSS feeds, detects new episodes, downloads audio,
|
|
and enqueues them for Liquidsoap playback.
|
|
|
|
Designed to run as a long-lived systemd service.
|
|
"""
|
|
|
|
import hashlib
|
|
import logging
|
|
import os
|
|
import re
|
|
import shutil
|
|
import signal
|
|
import sqlite3
|
|
import sys
|
|
import time
|
|
from datetime import datetime, timezone
|
|
from pathlib import Path
|
|
from urllib.parse import urlparse
|
|
|
|
import feedparser
|
|
import requests
|
|
import yaml
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Configuration
|
|
# ---------------------------------------------------------------------------
|
|
|
|
BASE_DIR = Path(os.environ.get("LOCALRADIO_BASE", "/opt/localradio"))
|
|
CONFIG_DIR = BASE_DIR / "config"
|
|
STATE_DIR = BASE_DIR / "state"
|
|
QUEUE_DIR = STATE_DIR / "queue"
|
|
PODCAST_DIR = BASE_DIR / "media" / "podcasts"
|
|
DB_PATH = STATE_DIR / "radio.db"
|
|
FEEDS_CONFIG = CONFIG_DIR / "feeds.yaml"
|
|
|
|
POLL_INTERVAL = int(os.environ.get("POLL_INTERVAL", "300")) # seconds
|
|
DOWNLOAD_TIMEOUT = int(os.environ.get("DOWNLOAD_TIMEOUT", "600")) # seconds
|
|
USER_AGENT = "LocalRadio/1.0 (Personal Podcast Poller)"
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Logging
|
|
# ---------------------------------------------------------------------------
|
|
|
|
LOG_DIR = BASE_DIR / "logs"
|
|
LOG_DIR.mkdir(parents=True, exist_ok=True)
|
|
|
|
logging.basicConfig(
|
|
level=logging.INFO,
|
|
format="%(asctime)s [%(levelname)s] %(message)s",
|
|
handlers=[
|
|
logging.StreamHandler(sys.stdout),
|
|
logging.FileHandler(LOG_DIR / "poller.log"),
|
|
],
|
|
)
|
|
log = logging.getLogger("poller")
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Graceful shutdown
|
|
# ---------------------------------------------------------------------------
|
|
|
|
_shutdown = False
|
|
|
|
|
|
def _handle_signal(signum, frame):
|
|
global _shutdown
|
|
log.info("Received signal %s, shutting down gracefully...", signum)
|
|
_shutdown = True
|
|
|
|
|
|
signal.signal(signal.SIGTERM, _handle_signal)
|
|
signal.signal(signal.SIGINT, _handle_signal)
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Database helpers
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def get_db() -> sqlite3.Connection:
|
|
conn = sqlite3.connect(str(DB_PATH))
|
|
conn.row_factory = sqlite3.Row
|
|
conn.execute("PRAGMA journal_mode=WAL")
|
|
conn.execute("PRAGMA foreign_keys=ON")
|
|
return conn
|
|
|
|
|
|
def episode_exists(conn: sqlite3.Connection, feed_name: str, guid: str) -> bool:
|
|
row = conn.execute(
|
|
"SELECT 1 FROM episodes WHERE feed_name=? AND guid=?", (feed_name, guid)
|
|
).fetchone()
|
|
return row is not None
|
|
|
|
|
|
def insert_episode(
|
|
conn: sqlite3.Connection,
|
|
feed_name: str,
|
|
guid: str,
|
|
title: str,
|
|
url: str,
|
|
pub_date: str | None,
|
|
) -> int:
|
|
cur = conn.execute(
|
|
"""INSERT INTO episodes (feed_name, guid, title, url, pub_date)
|
|
VALUES (?, ?, ?, ?, ?)""",
|
|
(feed_name, guid, title, url, pub_date),
|
|
)
|
|
conn.commit()
|
|
return cur.lastrowid
|
|
|
|
|
|
def mark_downloaded(conn: sqlite3.Connection, episode_id: int, file_path: str) -> None:
|
|
conn.execute(
|
|
"UPDATE episodes SET downloaded=1, file_path=? WHERE id=?",
|
|
(file_path, episode_id),
|
|
)
|
|
conn.commit()
|
|
|
|
|
|
def enqueue_episode(conn: sqlite3.Connection, episode_id: int) -> None:
|
|
# Position = next available integer
|
|
row = conn.execute("SELECT COALESCE(MAX(position), 0) + 1 FROM queue").fetchone()
|
|
next_pos = row[0]
|
|
conn.execute(
|
|
"INSERT OR IGNORE INTO queue (episode_id, position) VALUES (?, ?)",
|
|
(episode_id, next_pos),
|
|
)
|
|
conn.execute("UPDATE episodes SET queued=1 WHERE id=?", (episode_id,))
|
|
conn.commit()
|
|
|
|
|
|
def get_feed_state(conn: sqlite3.Connection, feed_name: str) -> dict | None:
|
|
row = conn.execute(
|
|
"SELECT * FROM feed_state WHERE feed_name=?", (feed_name,)
|
|
).fetchone()
|
|
return dict(row) if row else None
|
|
|
|
|
|
def update_feed_state(
|
|
conn: sqlite3.Connection,
|
|
feed_name: str,
|
|
etag: str | None = None,
|
|
modified: str | None = None,
|
|
) -> None:
|
|
conn.execute(
|
|
"""INSERT INTO feed_state (feed_name, last_poll, last_etag, last_modified)
|
|
VALUES (?, datetime('now'), ?, ?)
|
|
ON CONFLICT(feed_name) DO UPDATE SET
|
|
last_poll=datetime('now'),
|
|
last_etag=COALESCE(?, last_etag),
|
|
last_modified=COALESCE(?, last_modified)""",
|
|
(feed_name, etag, modified, etag, modified),
|
|
)
|
|
conn.commit()
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Feed parsing
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def load_feeds() -> list[dict]:
|
|
with open(FEEDS_CONFIG) as f:
|
|
data = yaml.safe_load(f)
|
|
feeds = data.get("feeds", [])
|
|
return [f for f in feeds if f.get("enabled", True)]
|
|
|
|
|
|
def extract_audio_url(entry) -> str | None:
|
|
"""Extract the audio enclosure URL from a feed entry."""
|
|
# Check enclosures first
|
|
for enc in getattr(entry, "enclosures", []):
|
|
etype = enc.get("type", "")
|
|
if etype.startswith("audio/") or enc.get("url", "").endswith(
|
|
(".mp3", ".m4a", ".ogg", ".opus", ".wav", ".flac")
|
|
):
|
|
return enc.get("url") or enc.get("href")
|
|
|
|
# Check media content
|
|
for media in getattr(entry, "media_content", []):
|
|
mtype = media.get("type", "")
|
|
if mtype.startswith("audio/"):
|
|
return media.get("url")
|
|
|
|
# Check links
|
|
for link in getattr(entry, "links", []):
|
|
if link.get("type", "").startswith("audio/"):
|
|
return link.get("href")
|
|
|
|
return None
|
|
|
|
|
|
def extract_guid(entry) -> str:
|
|
"""Get a stable unique identifier for an entry."""
|
|
if hasattr(entry, "id") and entry.id:
|
|
return entry.id
|
|
url = extract_audio_url(entry)
|
|
if url:
|
|
return hashlib.sha256(url.encode()).hexdigest()
|
|
title = getattr(entry, "title", "")
|
|
return hashlib.sha256(title.encode()).hexdigest()
|
|
|
|
|
|
def extract_pub_date(entry) -> str | None:
|
|
if hasattr(entry, "published_parsed") and entry.published_parsed:
|
|
try:
|
|
return datetime(*entry.published_parsed[:6], tzinfo=timezone.utc).isoformat()
|
|
except Exception:
|
|
pass
|
|
return getattr(entry, "published", None)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Downloading
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def sanitize_filename(name: str) -> str:
|
|
name = re.sub(r'[<>:"/\\|?*]', "_", name)
|
|
name = re.sub(r"\s+", "_", name)
|
|
return name[:200]
|
|
|
|
|
|
def download_episode(url: str, feed_name: str, title: str) -> Path | None:
|
|
"""Download an audio file. Returns the local path or None on failure."""
|
|
feed_dir = PODCAST_DIR / sanitize_filename(feed_name)
|
|
feed_dir.mkdir(parents=True, exist_ok=True)
|
|
|
|
# Determine filename from URL or title
|
|
parsed = urlparse(url)
|
|
url_filename = Path(parsed.path).name
|
|
if not url_filename or len(url_filename) < 3:
|
|
url_filename = sanitize_filename(title) + ".mp3"
|
|
|
|
dest = feed_dir / sanitize_filename(url_filename)
|
|
|
|
if dest.exists():
|
|
log.info("File already exists: %s", dest)
|
|
return dest
|
|
|
|
tmp = dest.with_suffix(dest.suffix + ".part")
|
|
try:
|
|
log.info("Downloading: %s", url)
|
|
with requests.get(
|
|
url,
|
|
stream=True,
|
|
timeout=DOWNLOAD_TIMEOUT,
|
|
headers={"User-Agent": USER_AGENT},
|
|
) as resp:
|
|
resp.raise_for_status()
|
|
with open(tmp, "wb") as f:
|
|
for chunk in resp.iter_content(chunk_size=65536):
|
|
if _shutdown:
|
|
log.warning("Shutdown during download, aborting")
|
|
tmp.unlink(missing_ok=True)
|
|
return None
|
|
f.write(chunk)
|
|
tmp.rename(dest)
|
|
log.info("Downloaded: %s (%.1f MB)", dest, dest.stat().st_size / 1048576)
|
|
return dest
|
|
except Exception:
|
|
log.exception("Failed to download %s", url)
|
|
tmp.unlink(missing_ok=True)
|
|
return None
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Queue management
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def link_to_queue(file_path: Path, episode_id: int) -> None:
|
|
"""Create a symlink in the queue directory for Liquidsoap to pick up.
|
|
|
|
Filename is prefixed with a timestamp to ensure FIFO ordering when
|
|
Liquidsoap reads the directory in alphabetical order.
|
|
"""
|
|
QUEUE_DIR.mkdir(parents=True, exist_ok=True)
|
|
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
|
|
link_name = f"{timestamp}_{episode_id:06d}_{file_path.name}"
|
|
link_path = QUEUE_DIR / link_name
|
|
try:
|
|
link_path.symlink_to(file_path.resolve())
|
|
log.info("Queued (symlink): %s -> %s", link_name, file_path)
|
|
except (OSError, FileExistsError):
|
|
# Symlinks may fail on Windows without admin; fall back to hard link
|
|
try:
|
|
link_path.hardlink_to(file_path.resolve())
|
|
log.info("Queued (hardlink): %s -> %s", link_name, file_path)
|
|
except (OSError, FileExistsError):
|
|
# Last resort: copy the file
|
|
shutil.copy2(str(file_path), str(link_path))
|
|
log.info("Queued (copy): %s -> %s", link_name, file_path)
|
|
|
|
|
|
def cleanup_played_queue_files(conn: sqlite3.Connection) -> None:
|
|
"""Remove queue symlinks for files that no longer exist (already played).
|
|
|
|
Liquidsoap in 'normal' mode plays files and moves on. We detect played
|
|
files by checking if the symlink target is gone or if the file was consumed.
|
|
This is a simple approach: remove broken symlinks.
|
|
"""
|
|
if not QUEUE_DIR.exists():
|
|
return
|
|
for item in sorted(QUEUE_DIR.iterdir()):
|
|
if item.is_symlink() and not item.resolve().exists():
|
|
log.info("Removing broken queue link: %s", item.name)
|
|
item.unlink()
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Main poll cycle
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def poll_feed(conn: sqlite3.Connection, feed_cfg: dict) -> int:
|
|
"""Poll a single feed. Returns number of new episodes found."""
|
|
name = feed_cfg["name"]
|
|
url = feed_cfg["url"]
|
|
new_count = 0
|
|
|
|
log.info("Polling feed: %s", name)
|
|
state = get_feed_state(conn, name)
|
|
|
|
kwargs = {}
|
|
if state:
|
|
if state.get("last_etag"):
|
|
kwargs["etag"] = state["last_etag"]
|
|
if state.get("last_modified"):
|
|
kwargs["modified"] = state["last_modified"]
|
|
|
|
try:
|
|
feed = feedparser.parse(url, agent=USER_AGENT, **kwargs)
|
|
except Exception:
|
|
log.exception("Failed to parse feed: %s", name)
|
|
return 0
|
|
|
|
if feed.bozo and not feed.entries:
|
|
log.warning("Feed error for %s: %s", name, feed.bozo_exception)
|
|
return 0
|
|
|
|
# Update feed state
|
|
etag = getattr(feed, "etag", None)
|
|
modified = getattr(feed, "modified", None)
|
|
update_feed_state(conn, name, etag, modified)
|
|
|
|
if feed.status == 304:
|
|
log.info("Feed %s: not modified", name)
|
|
return 0
|
|
|
|
for entry in feed.entries:
|
|
guid = extract_guid(entry)
|
|
if episode_exists(conn, name, guid):
|
|
continue
|
|
|
|
audio_url = extract_audio_url(entry)
|
|
if not audio_url:
|
|
log.debug("No audio URL in entry: %s", getattr(entry, "title", "unknown"))
|
|
continue
|
|
|
|
title = getattr(entry, "title", "Untitled")
|
|
pub_date = extract_pub_date(entry)
|
|
|
|
episode_id = insert_episode(conn, name, guid, title, audio_url, pub_date)
|
|
log.info("New episode: [%s] %s", name, title)
|
|
|
|
# Download
|
|
file_path = download_episode(audio_url, name, title)
|
|
if _shutdown:
|
|
return new_count
|
|
|
|
if file_path:
|
|
mark_downloaded(conn, episode_id, str(file_path))
|
|
enqueue_episode(conn, episode_id)
|
|
link_to_queue(file_path, episode_id)
|
|
new_count += 1
|
|
else:
|
|
log.error("Skipping episode (download failed): %s", title)
|
|
|
|
return new_count
|
|
|
|
|
|
def poll_all_feeds() -> None:
|
|
feeds = load_feeds()
|
|
if not feeds:
|
|
log.warning("No enabled feeds configured in %s", FEEDS_CONFIG)
|
|
return
|
|
|
|
conn = get_db()
|
|
try:
|
|
total_new = 0
|
|
for feed_cfg in feeds:
|
|
if _shutdown:
|
|
break
|
|
try:
|
|
total_new += poll_feed(conn, feed_cfg)
|
|
except Exception:
|
|
log.exception("Error processing feed: %s", feed_cfg.get("name"))
|
|
|
|
cleanup_played_queue_files(conn)
|
|
|
|
if total_new > 0:
|
|
log.info("Poll complete: %d new episode(s) enqueued", total_new)
|
|
else:
|
|
log.info("Poll complete: no new episodes")
|
|
finally:
|
|
conn.close()
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Entry point
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def main() -> None:
|
|
log.info("Local Radio RSS Poller starting (interval=%ds)", POLL_INTERVAL)
|
|
log.info("Config: %s", FEEDS_CONFIG)
|
|
log.info("Database: %s", DB_PATH)
|
|
log.info("Queue dir: %s", QUEUE_DIR)
|
|
|
|
# Ensure directories exist
|
|
QUEUE_DIR.mkdir(parents=True, exist_ok=True)
|
|
PODCAST_DIR.mkdir(parents=True, exist_ok=True)
|
|
|
|
# Ensure DB is initialized
|
|
from init_db import init_db
|
|
init_db(DB_PATH)
|
|
|
|
while not _shutdown:
|
|
try:
|
|
poll_all_feeds()
|
|
except Exception:
|
|
log.exception("Unexpected error in poll cycle")
|
|
|
|
# Sleep in small increments so we can respond to signals
|
|
for _ in range(POLL_INTERVAL):
|
|
if _shutdown:
|
|
break
|
|
time.sleep(1)
|
|
|
|
log.info("Poller shut down cleanly")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|