import os import time import json import xml.etree.ElementTree as ET import requests from transformers import pipeline from datetime import datetime, timedelta from generate_tiktok_feed import save_tiktok_feed # ====================== # CONFIGURATION # ====================== def load_config(): """Load configuration from config.json file.""" config_file = "config.json" # Default configuration (fallback) default_config = { "interests": { "Efficient ML / Edge AI": { "query": 'cat:cs.LG OR cat:cs.CV OR cat:cs.CL', "keywords": ['efficient', 'edge', 'compression', 'quantization', 'pruning', 'distillation', 'inference', 'lightweight', 'mobile', 'accelerat'] } }, "settings": { "papers_per_interest": 10, "summary_max_length": 160, "recent_days": 7, "fallback_days": 90, "min_papers_threshold": 5, "fetch_multiplier": 5, "user_agent": "ResearchDigestBot/1.0 (github.com/wedsmoker)" } } if os.path.exists(config_file): try: with open(config_file, 'r', encoding='utf-8') as f: config = json.load(f) print(f"✅ Loaded configuration from {config_file}") return config except Exception as e: print(f"⚠️ Error loading config file: {e}. Using defaults.") return default_config else: print(f"⚠️ {config_file} not found. Using default configuration.") return default_config # Load configuration config = load_config() INTERESTS = config.get('interests', {}) settings = config.get('settings', {}) PAPERS_PER_INTEREST = settings.get('papers_per_interest', 10) SUMMARY_MAX_LENGTH = settings.get('summary_max_length', 160) USER_AGENT = settings.get('user_agent', 'ResearchDigestBot/1.0') # Date filtering: Only fetch papers from the last N days (set to 0 to disable) RECENT_DAYS = settings.get('recent_days', 7) FALLBACK_DAYS = settings.get('fallback_days', 90) MIN_PAPERS_THRESHOLD = settings.get('min_papers_threshold', 5) FETCH_MULTIPLIER = settings.get('fetch_multiplier', 5) # Deduplication: Track papers we've already shown SEEN_PAPERS_FILE = "seen_papers.json" # Initialize summarizer (optional) try: summarizer = pipeline( "summarization", model="sshleifer/distilbart-cnn-12-6", device=-1 ) except Exception as e: print(f"⚠️ Summarizer unavailable ({e}). Using raw abstracts.") summarizer = None # ====================== # DEDUPLICATION HELPERS # ====================== def load_seen_papers(): """Load the set of previously seen paper IDs.""" if os.path.exists(SEEN_PAPERS_FILE): try: with open(SEEN_PAPERS_FILE, 'r') as f: data = json.load(f) return set(data.get('seen_ids', [])) except Exception as e: print(f"⚠️ Error loading seen papers: {e}") return set() def save_seen_papers(seen_ids): """Save the set of seen paper IDs.""" try: with open(SEEN_PAPERS_FILE, 'w') as f: json.dump({ 'seen_ids': list(seen_ids), 'last_updated': datetime.now().isoformat() }, f, indent=2) except Exception as e: print(f"⚠️ Error saving seen papers: {e}") def get_date_filter(days=None): """Generate date filter for arXiv query (last N days).""" if days is None: days = RECENT_DAYS if days <= 0: return "" end_date = datetime.now() start_date = end_date - timedelta(days=days) # arXiv date format: YYYYMMDD0000 to YYYYMMDD2359 date_filter = f"submittedDate:[{start_date.strftime('%Y%m%d')}0000 TO {end_date.strftime('%Y%m%d')}2359]" return date_filter # ====================== # ARXIV FETCH & PARSE # ====================== def fetch_arxiv_papers(query, max_results=5, days_back=None): url = "http://export.arxiv.org/api/query" # Add date filter if configured date_filter = get_date_filter(days_back) if date_filter: # Combine user query with date filter using AND query = f"({query}) AND {date_filter}" params = { "search_query": query, "start": 0, "max_results": max_results, "sortBy": "submittedDate", "sortOrder": "descending" } headers = {"User-Agent": USER_AGENT} try: response = requests.get(url, params=params, headers=headers, timeout=20) response.raise_for_status() return response.text except Exception as e: print(f"❌ Error fetching query '{query}': {e}") return None def parse_papers(xml_data): if not xml_data: return [] try: root = ET.fromstring(xml_data) except ET.ParseError: return [] namespace = {'atom': 'http://www.w3.org/2005/Atom'} papers = [] for entry in root.findall('atom:entry', namespace): title_elem = entry.find('atom:title', namespace) summary_elem = entry.find('atom:summary', namespace) id_elem = entry.find('atom:id', namespace) published_elem = entry.find('atom:published', namespace) if None in (title_elem, summary_elem, id_elem): continue title = ' '.join(title_elem.text.strip().split()) summary = ' '.join(summary_elem.text.strip().split()) link = id_elem.text published = published_elem.text.split('T')[0] if published_elem is not None else "Unknown" # Extract arXiv ID arxiv_id = link.split('/abs/')[-1].split('v')[0] # Get primary category primary_cat_elem = entry.find('.//{http://arxiv.org/schemas/atom}primary_category') category = primary_cat_elem.get('term') if primary_cat_elem is not None else "unknown" papers.append({ 'title': title, 'summary': summary, 'link': link, 'pdf_link': f"https://arxiv.org/pdf/{arxiv_id}.pdf", 'arxiv_id': arxiv_id, 'category': category, 'published': published }) return papers def summarize_abstract(abstract): if summarizer is None: return abstract[:SUMMARY_MAX_LENGTH] + ("..." if len(abstract) > SUMMARY_MAX_LENGTH else "") try: if len(abstract.split()) < 15: return abstract result = summarizer( abstract, max_length=min(SUMMARY_MAX_LENGTH, 142), min_length=30, truncation=True ) return result[0]['summary_text'] except Exception as e: return abstract[:SUMMARY_MAX_LENGTH] + "..." def calculate_relevance_score(paper, keywords): """Calculate relevance score based on keyword matches in title and abstract.""" title_lower = paper['title'].lower() abstract_lower = paper['summary'].lower() score = 0 matched_keywords = [] for keyword in keywords: keyword_lower = keyword.lower() # Title matches are worth more if keyword_lower in title_lower: score += 3 matched_keywords.append(keyword) # Abstract matches elif keyword_lower in abstract_lower: score += 1 matched_keywords.append(keyword) # Bonus for multiple keyword matches if len(matched_keywords) > 2: score += len(matched_keywords) - 2 paper['relevance_score'] = score paper['matched_keywords'] = matched_keywords return score def estimate_difficulty(abstract, category): """Estimate paper difficulty using heuristic keyword analysis.""" abstract_lower = abstract.lower() # Theory-heavy indicators complexity_words = ['theoretical', 'proof', 'theorem', 'convergence', 'optimal', 'asymptotic', 'lemma', 'proposition', 'rigorous', 'formalism'] # Applied/practical indicators applied_words = ['system', 'framework', 'application', 'dataset', 'benchmark', 'implementation', 'experiment', 'empirical', 'practical'] # Math-heavy categories math_categories = ['math.', 'stat.', 'quant-ph'] # Calculate score score = sum(1 for w in complexity_words if w in abstract_lower) score -= sum(0.5 for w in applied_words if w in abstract_lower) # Category bonus if any(cat in category for cat in math_categories): score += 1 # Determine difficulty level if score > 2: return "🔴 Theory-Heavy" elif score > 0.5: return "🟡 Advanced" else: return "🟢 Applied" def generate_layman_context(title, abstract): """Generate simple layman explanation using keyword extraction and templates.""" abstract_lower = abstract.lower() # Extract key action words and concepts action_map = { 'improv': 'improves', 'reduc': 'reduces', 'enhanc': 'enhances', 'optimi': 'optimizes', 'acceler': 'speeds up', 'efficient': 'makes more efficient', 'novel': 'introduces a new approach to', 'outperform': 'works better than existing methods for', 'achiev': 'achieves better', 'propose': 'proposes a method for', 'present': 'presents techniques for', 'address': 'tackles the problem of', 'privacy': 'protecting data privacy in', 'federated': 'distributed machine learning across', 'emotion': 'understanding emotions in', 'embedded': 'running AI on low-power devices for', 'edge': 'running AI locally on devices for', 'compression': 'making models smaller for', 'inference': 'faster predictions in', 'generative': 'creating new content with', 'detection': 'automatically finding', 'classification': 'categorizing', 'prediction': 'forecasting' } # Find first matching action action = "explores techniques in" for keyword, phrase in action_map.items(): if keyword in abstract_lower[:300]: # Check first part of abstract action = phrase break # Extract domain domain = "machine learning" if "language model" in abstract_lower or "llm" in abstract_lower or "nlp" in abstract_lower: domain = "language AI" elif "vision" in abstract_lower or "image" in abstract_lower or "visual" in abstract_lower: domain = "computer vision" elif "speech" in abstract_lower or "audio" in abstract_lower: domain = "speech processing" elif "privacy" in abstract_lower or "federated" in abstract_lower: domain = "privacy-preserving AI" elif "edge" in abstract_lower or "embedded" in abstract_lower or "device" in abstract_lower: domain = "edge computing" elif "emotion" in abstract_lower or "affective" in abstract_lower: domain = "emotion AI" return f"This research {action} {domain}." # ====================== # HTML OUTPUT # ====================== def save_html_digest(all_papers_by_interest, filename=None): # Create archive directory if it doesn't exist archive_dir = "arxiv_archive" if not os.path.exists(archive_dir): os.makedirs(archive_dir) if filename is None: date_str = datetime.now().strftime('%Y%m%d') filename = os.path.join(archive_dir, f"arxiv_digest_{date_str}.html") # Also save as latest.html for easy syncing latest_file = "latest.html" html = f"""
No recent papers found.
\n' else: html += '