Files
rss-proxy/proxy/rss_proxy.py
bacon 0b5e8e5e8a update rss_proxy.py
* removed guid rewrite
2025-03-10 20:54:49 +03:00

110 lines
3.5 KiB
Python
Executable File

import html
import urllib.parse
import os
import requests
import redis
import xml.etree.ElementTree as ET
import re
import unicodedata
from flask import request, Response
PROXY_URL = os.getenv("PROXY_URL")
REDIS_URL = os.getenv("REDIS_URL", "redis://localhost:6379/0")
CACHE_TTL = int(os.getenv("CACHE_TTL", 3600))
rdb = redis.from_url(REDIS_URL)
_head_html = f"""<?xml version="1.0" encoding="UTF-8"?>
<rss version="2.0">
<channel><title>Tapochek.net RSS</title>
<link>http://tapochek.net/</link>
<ttl>15</ttl>"""
def normalize_text(text):
"""Приводит текст к нормальному виду, устраняя странные символы."""
if text:
return unicodedata.normalize("NFKC", text)
return text
def extract_viewtopic_link(description):
decoded_description = html.unescape(description)
match = re.search(r'href="(https?://[^"]+)"', decoded_description)
return match.group(1) if match else None
def normalize_xml_texts(elem):
"""Применяет normalize_text ко всем текстовым узлам XML."""
if elem.text:
elem.text = normalize_text(elem.text)
if elem.tail:
elem.tail = normalize_text(elem.tail)
for child in elem:
normalize_xml_texts(child)
def init_proxy(app):
@app.route("/proxy")
def proxy():
"""Proxy RSS feed with per-item caching and GUID replacement."""
raw_query = request.query_string.decode()
if raw_query.startswith("url="):
url = urllib.parse.unquote(raw_query[4:])
url = html.unescape(url)
else:
return "Missing URL", 400
try:
proxies = {"http": PROXY_URL, "https": PROXY_URL} if PROXY_URL else None
r = requests.get(url, timeout=10, proxies=proxies)
xml_data = r.text
xml_data = xml_data.replace("&", "&amp;")
_encoding = xml_data.split('encoding="')[1].split('"')[0]
if '<?xml version="1.0" encoding="' in xml_data:
xml_data = xml_data.replace(
f'<?xml version="1.0" encoding="{_encoding}"?>',
'<?xml version="1.0" encoding="UTF-8"?>'
)
if '<?xml version="1.0" encoding="UTF-8"?>' not in xml_data:
xml_data = f'<?xml version="1.0" encoding="UTF-8"?>{xml_data}'
root = ET.fromstring(xml_data)
items = root.findall(".//item")
cached_items = []
new_items = []
for item in items:
guid = item.find("guid")
if guid is None or not guid.text:
continue
cache_key = f"rss:item:{guid.text}"
cached_item = rdb.get(cache_key)
title = item.find("title")
new_title = re.sub(r'&#\d+', '', title.text)
title.text = new_title
if cached_item:
cached_items.append(cached_item.decode())
else:
normalize_xml_texts(item)
item_str = ET.tostring(item, encoding="unicode")
item_str = html.unescape(item_str)
rdb.setex(cache_key, CACHE_TTL, item_str)
new_items.append(item_str)
final_items = cached_items + new_items
response_xml = f"""{_head_html}{"".join(final_items)}</channel></rss>"""
return Response(response_xml, content_type="application/xml; charset=utf-8")
except Exception as e:
return f"Error: {e}", 500