#!/usr/bin/env python3 """ RSS Feed Poller for Local Radio Station. Polls configured RSS feeds, detects new episodes, downloads audio, and enqueues them for Liquidsoap playback. Designed to run as a long-lived systemd service. """ import hashlib import logging import os import re import shutil import signal import sqlite3 import sys import time from datetime import datetime, timezone from pathlib import Path from urllib.parse import urlparse import feedparser import requests import yaml # --------------------------------------------------------------------------- # Configuration # --------------------------------------------------------------------------- BASE_DIR = Path(os.environ.get("LOCALRADIO_BASE", "/opt/localradio")) CONFIG_DIR = BASE_DIR / "config" STATE_DIR = BASE_DIR / "state" QUEUE_DIR = STATE_DIR / "queue" PODCAST_DIR = BASE_DIR / "media" / "podcasts" DB_PATH = STATE_DIR / "radio.db" FEEDS_CONFIG = CONFIG_DIR / "feeds.yaml" POLL_INTERVAL = int(os.environ.get("POLL_INTERVAL", "300")) # seconds DOWNLOAD_TIMEOUT = int(os.environ.get("DOWNLOAD_TIMEOUT", "600")) # seconds USER_AGENT = "LocalRadio/1.0 (Personal Podcast Poller)" # --------------------------------------------------------------------------- # Logging # --------------------------------------------------------------------------- LOG_DIR = BASE_DIR / "logs" LOG_DIR.mkdir(parents=True, exist_ok=True) logging.basicConfig( level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s", handlers=[ logging.StreamHandler(sys.stdout), logging.FileHandler(LOG_DIR / "poller.log"), ], ) log = logging.getLogger("poller") # --------------------------------------------------------------------------- # Graceful shutdown # --------------------------------------------------------------------------- _shutdown = False def _handle_signal(signum, frame): global _shutdown log.info("Received signal %s, shutting down gracefully...", signum) _shutdown = True signal.signal(signal.SIGTERM, _handle_signal) signal.signal(signal.SIGINT, _handle_signal) # --------------------------------------------------------------------------- # Database helpers # --------------------------------------------------------------------------- def get_db() -> sqlite3.Connection: conn = sqlite3.connect(str(DB_PATH)) conn.row_factory = sqlite3.Row conn.execute("PRAGMA journal_mode=WAL") conn.execute("PRAGMA foreign_keys=ON") return conn def episode_exists(conn: sqlite3.Connection, feed_name: str, guid: str) -> bool: row = conn.execute( "SELECT 1 FROM episodes WHERE feed_name=? AND guid=?", (feed_name, guid) ).fetchone() return row is not None def insert_episode( conn: sqlite3.Connection, feed_name: str, guid: str, title: str, url: str, pub_date: str | None, ) -> int: cur = conn.execute( """INSERT INTO episodes (feed_name, guid, title, url, pub_date) VALUES (?, ?, ?, ?, ?)""", (feed_name, guid, title, url, pub_date), ) conn.commit() return cur.lastrowid def mark_downloaded(conn: sqlite3.Connection, episode_id: int, file_path: str) -> None: conn.execute( "UPDATE episodes SET downloaded=1, file_path=? WHERE id=?", (file_path, episode_id), ) conn.commit() def enqueue_episode(conn: sqlite3.Connection, episode_id: int) -> None: # Position = next available integer row = conn.execute("SELECT COALESCE(MAX(position), 0) + 1 FROM queue").fetchone() next_pos = row[0] conn.execute( "INSERT OR IGNORE INTO queue (episode_id, position) VALUES (?, ?)", (episode_id, next_pos), ) conn.execute("UPDATE episodes SET queued=1 WHERE id=?", (episode_id,)) conn.commit() def get_feed_state(conn: sqlite3.Connection, feed_name: str) -> dict | None: row = conn.execute( "SELECT * FROM feed_state WHERE feed_name=?", (feed_name,) ).fetchone() return dict(row) if row else None def update_feed_state( conn: sqlite3.Connection, feed_name: str, etag: str | None = None, modified: str | None = None, ) -> None: conn.execute( """INSERT INTO feed_state (feed_name, last_poll, last_etag, last_modified) VALUES (?, datetime('now'), ?, ?) ON CONFLICT(feed_name) DO UPDATE SET last_poll=datetime('now'), last_etag=COALESCE(?, last_etag), last_modified=COALESCE(?, last_modified)""", (feed_name, etag, modified, etag, modified), ) conn.commit() # --------------------------------------------------------------------------- # Feed parsing # --------------------------------------------------------------------------- def load_feeds() -> list[dict]: with open(FEEDS_CONFIG) as f: data = yaml.safe_load(f) feeds = data.get("feeds", []) return [f for f in feeds if f.get("enabled", True)] def extract_audio_url(entry) -> str | None: """Extract the audio enclosure URL from a feed entry.""" # Check enclosures first for enc in getattr(entry, "enclosures", []): etype = enc.get("type", "") if etype.startswith("audio/") or enc.get("url", "").endswith( (".mp3", ".m4a", ".ogg", ".opus", ".wav", ".flac") ): return enc.get("url") or enc.get("href") # Check media content for media in getattr(entry, "media_content", []): mtype = media.get("type", "") if mtype.startswith("audio/"): return media.get("url") # Check links for link in getattr(entry, "links", []): if link.get("type", "").startswith("audio/"): return link.get("href") return None def extract_guid(entry) -> str: """Get a stable unique identifier for an entry.""" if hasattr(entry, "id") and entry.id: return entry.id url = extract_audio_url(entry) if url: return hashlib.sha256(url.encode()).hexdigest() title = getattr(entry, "title", "") return hashlib.sha256(title.encode()).hexdigest() def extract_pub_date(entry) -> str | None: if hasattr(entry, "published_parsed") and entry.published_parsed: try: return datetime(*entry.published_parsed[:6], tzinfo=timezone.utc).isoformat() except Exception: pass return getattr(entry, "published", None) # --------------------------------------------------------------------------- # Downloading # --------------------------------------------------------------------------- def sanitize_filename(name: str) -> str: name = re.sub(r'[<>:"/\\|?*]', "_", name) name = re.sub(r"\s+", "_", name) return name[:200] def download_episode(url: str, feed_name: str, title: str) -> Path | None: """Download an audio file. Returns the local path or None on failure.""" feed_dir = PODCAST_DIR / sanitize_filename(feed_name) feed_dir.mkdir(parents=True, exist_ok=True) # Determine filename from URL or title parsed = urlparse(url) url_filename = Path(parsed.path).name if not url_filename or len(url_filename) < 3: url_filename = sanitize_filename(title) + ".mp3" dest = feed_dir / sanitize_filename(url_filename) if dest.exists(): log.info("File already exists: %s", dest) return dest tmp = dest.with_suffix(dest.suffix + ".part") try: log.info("Downloading: %s", url) with requests.get( url, stream=True, timeout=DOWNLOAD_TIMEOUT, headers={"User-Agent": USER_AGENT}, ) as resp: resp.raise_for_status() with open(tmp, "wb") as f: for chunk in resp.iter_content(chunk_size=65536): if _shutdown: log.warning("Shutdown during download, aborting") tmp.unlink(missing_ok=True) return None f.write(chunk) tmp.rename(dest) log.info("Downloaded: %s (%.1f MB)", dest, dest.stat().st_size / 1048576) return dest except Exception: log.exception("Failed to download %s", url) tmp.unlink(missing_ok=True) return None # --------------------------------------------------------------------------- # Queue management # --------------------------------------------------------------------------- def link_to_queue(file_path: Path, episode_id: int) -> None: """Create a symlink in the queue directory for Liquidsoap to pick up. Filename is prefixed with a timestamp to ensure FIFO ordering when Liquidsoap reads the directory in alphabetical order. """ QUEUE_DIR.mkdir(parents=True, exist_ok=True) timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") link_name = f"{timestamp}_{episode_id:06d}_{file_path.name}" link_path = QUEUE_DIR / link_name try: link_path.symlink_to(file_path.resolve()) log.info("Queued (symlink): %s -> %s", link_name, file_path) except (OSError, FileExistsError): # Symlinks may fail on Windows without admin; fall back to hard link try: link_path.hardlink_to(file_path.resolve()) log.info("Queued (hardlink): %s -> %s", link_name, file_path) except (OSError, FileExistsError): # Last resort: copy the file shutil.copy2(str(file_path), str(link_path)) log.info("Queued (copy): %s -> %s", link_name, file_path) def cleanup_played_queue_files(conn: sqlite3.Connection) -> None: """Remove queue symlinks for files that no longer exist (already played). Liquidsoap in 'normal' mode plays files and moves on. We detect played files by checking if the symlink target is gone or if the file was consumed. This is a simple approach: remove broken symlinks. """ if not QUEUE_DIR.exists(): return for item in sorted(QUEUE_DIR.iterdir()): if item.is_symlink() and not item.resolve().exists(): log.info("Removing broken queue link: %s", item.name) item.unlink() # --------------------------------------------------------------------------- # Main poll cycle # --------------------------------------------------------------------------- def poll_feed(conn: sqlite3.Connection, feed_cfg: dict) -> int: """Poll a single feed. Returns number of new episodes found.""" name = feed_cfg["name"] url = feed_cfg["url"] new_count = 0 log.info("Polling feed: %s", name) state = get_feed_state(conn, name) kwargs = {} if state: if state.get("last_etag"): kwargs["etag"] = state["last_etag"] if state.get("last_modified"): kwargs["modified"] = state["last_modified"] try: feed = feedparser.parse(url, agent=USER_AGENT, **kwargs) except Exception: log.exception("Failed to parse feed: %s", name) return 0 if feed.bozo and not feed.entries: log.warning("Feed error for %s: %s", name, feed.bozo_exception) return 0 # Update feed state etag = getattr(feed, "etag", None) modified = getattr(feed, "modified", None) update_feed_state(conn, name, etag, modified) if feed.status == 304: log.info("Feed %s: not modified", name) return 0 for entry in feed.entries: guid = extract_guid(entry) if episode_exists(conn, name, guid): continue audio_url = extract_audio_url(entry) if not audio_url: log.debug("No audio URL in entry: %s", getattr(entry, "title", "unknown")) continue title = getattr(entry, "title", "Untitled") pub_date = extract_pub_date(entry) episode_id = insert_episode(conn, name, guid, title, audio_url, pub_date) log.info("New episode: [%s] %s", name, title) # Download file_path = download_episode(audio_url, name, title) if _shutdown: return new_count if file_path: mark_downloaded(conn, episode_id, str(file_path)) enqueue_episode(conn, episode_id) link_to_queue(file_path, episode_id) new_count += 1 else: log.error("Skipping episode (download failed): %s", title) return new_count def poll_all_feeds() -> None: feeds = load_feeds() if not feeds: log.warning("No enabled feeds configured in %s", FEEDS_CONFIG) return conn = get_db() try: total_new = 0 for feed_cfg in feeds: if _shutdown: break try: total_new += poll_feed(conn, feed_cfg) except Exception: log.exception("Error processing feed: %s", feed_cfg.get("name")) cleanup_played_queue_files(conn) if total_new > 0: log.info("Poll complete: %d new episode(s) enqueued", total_new) else: log.info("Poll complete: no new episodes") finally: conn.close() # --------------------------------------------------------------------------- # Entry point # --------------------------------------------------------------------------- def main() -> None: log.info("Local Radio RSS Poller starting (interval=%ds)", POLL_INTERVAL) log.info("Config: %s", FEEDS_CONFIG) log.info("Database: %s", DB_PATH) log.info("Queue dir: %s", QUEUE_DIR) # Ensure directories exist QUEUE_DIR.mkdir(parents=True, exist_ok=True) PODCAST_DIR.mkdir(parents=True, exist_ok=True) # Ensure DB is initialized from init_db import init_db init_db(DB_PATH) while not _shutdown: try: poll_all_feeds() except Exception: log.exception("Unexpected error in poll cycle") # Sleep in small increments so we can respond to signals for _ in range(POLL_INTERVAL): if _shutdown: break time.sleep(1) log.info("Poller shut down cleanly") if __name__ == "__main__": main()