"""Memos API client - fetch memos using Connect RPC protocol.""" import logging from datetime import datetime, timezone, timedelta import requests logger = logging.getLogger(__name__) class MemosClient: """Client for self-hosted Memos API (Connect RPC protocol).""" def __init__(self, base_url, access_token): self.base_url = base_url.rstrip("/") self.session = requests.Session() self.session.headers.update({ "Authorization": f"Bearer {access_token}", "Content-Type": "application/json", }) self._user_id = None def get_user(self): """Get current user info via Connect RPC.""" resp = self.session.post( f"{self.base_url}/memos.api.v1.AuthService/GetCurrentUser", json={}, timeout=10 ) resp.raise_for_status() data = resp.json() user = data.get("user", data) self._user_id = user.get("name", "") logger.info("Memos user: %s (%s)", user.get("username"), self._user_id) return user def get_user_id(self): """Get current user's resource name (e.g. 'users/FXY').""" if not self._user_id: self.get_user() return self._user_id def list_memos(self, days=1, page_size=100): """Fetch memos from the last N days via Connect RPC.""" user = self.get_user_id() since = (datetime.now(timezone.utc) - timedelta(days=days)).isoformat() # Connect RPC uses POST with JSON body payload = { "pageSize": page_size, "filter": f"creator == '{user}'", } resp = self.session.post( f"{self.base_url}/memos.api.v1.MemoService/ListMemos", json=payload, timeout=15 ) resp.raise_for_status() data = resp.json() memos = data.get("memos", []) logger.info("Memos API OK | fetched %d memos", len(memos)) # Filter by time client-side (Connect RPC filter might not work as expected) results = [] since_dt = datetime.now(timezone.utc) - timedelta(days=days) for m in memos: created = m.get("createTime", "") try: created_dt = datetime.fromisoformat(created.replace("Z", "+00:00")) except (ValueError, AttributeError): created_dt = datetime.now(timezone.utc) if created_dt < since_dt: continue results.append({ "id": m.get("name", "").split("/")[-1], "content": m.get("content", ""), "created_at": created, "tags": self._extract_tags(m.get("content", "")), "visibility": m.get("visibility", ""), }) logger.info( "Filtered to %d memos since %s", len(results), since[:10] ) return results def list_all_memos_from_range(self, start_date, end_date, page_size=200): """Fetch memos within a date range via Connect RPC.""" user = self.get_user_id() payload = {"pageSize": page_size} resp = self.session.post( f"{self.base_url}/memos.api.v1.MemoService/ListMemos", json=payload, timeout=15 ) resp.raise_for_status() data = resp.json() memos = data.get("memos", []) logger.info("Memos API OK | fetched %d memos total", len(memos)) # Client-side date range filtering results = [] for m in memos: created = m.get("createTime", "") try: created_dt = datetime.fromisoformat(created.replace("Z", "+00:00")) except (ValueError, AttributeError): continue if created_dt < start_date or created_dt > end_date: continue results.append({ "id": m.get("name", "").split("/")[-1], "content": m.get("content", ""), "created_at": created, "tags": self._extract_tags(m.get("content", "")), "visibility": m.get("visibility", ""), }) logger.info( "Filtered to %d memos in range %s - %s", len(results), start_date.strftime("%m/%d"), end_date.strftime("%m/%d") ) return results @staticmethod def _extract_tags(content): """Extract #tags from memo content.""" import re return re.findall(r"#(\w[\w\-]*)", content)