139 lines
4.4 KiB
Python
139 lines
4.4 KiB
Python
"""Memos API client - fetch memos using Connect RPC protocol."""
|
|
|
|
import logging
|
|
from datetime import datetime, timezone, timedelta
|
|
|
|
import requests
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class MemosClient:
|
|
"""Client for self-hosted Memos API (Connect RPC protocol)."""
|
|
|
|
def __init__(self, base_url, access_token):
|
|
self.base_url = base_url.rstrip("/")
|
|
self.session = requests.Session()
|
|
self.session.headers.update({
|
|
"Authorization": f"Bearer {access_token}",
|
|
"Content-Type": "application/json",
|
|
})
|
|
self._user_id = None
|
|
|
|
def get_user(self):
|
|
"""Get current user info via Connect RPC."""
|
|
resp = self.session.post(
|
|
f"{self.base_url}/memos.api.v1.AuthService/GetCurrentUser",
|
|
json={},
|
|
timeout=10
|
|
)
|
|
resp.raise_for_status()
|
|
data = resp.json()
|
|
user = data.get("user", data)
|
|
self._user_id = user.get("name", "")
|
|
logger.info("Memos user: %s (%s)", user.get("username"), self._user_id)
|
|
return user
|
|
|
|
def get_user_id(self):
|
|
"""Get current user's resource name (e.g. 'users/FXY')."""
|
|
if not self._user_id:
|
|
self.get_user()
|
|
return self._user_id
|
|
|
|
def list_memos(self, days=1, page_size=100):
|
|
"""Fetch memos from the last N days via Connect RPC."""
|
|
user = self.get_user_id()
|
|
since = (datetime.now(timezone.utc) - timedelta(days=days)).isoformat()
|
|
|
|
# Connect RPC uses POST with JSON body
|
|
payload = {
|
|
"pageSize": page_size,
|
|
"filter": f"creator == '{user}'",
|
|
}
|
|
|
|
resp = self.session.post(
|
|
f"{self.base_url}/memos.api.v1.MemoService/ListMemos",
|
|
json=payload,
|
|
timeout=15
|
|
)
|
|
resp.raise_for_status()
|
|
data = resp.json()
|
|
|
|
memos = data.get("memos", [])
|
|
logger.info("Memos API OK | fetched %d memos", len(memos))
|
|
|
|
# Filter by time client-side (Connect RPC filter might not work as expected)
|
|
results = []
|
|
since_dt = datetime.now(timezone.utc) - timedelta(days=days)
|
|
for m in memos:
|
|
created = m.get("createTime", "")
|
|
try:
|
|
created_dt = datetime.fromisoformat(created.replace("Z", "+00:00"))
|
|
except (ValueError, AttributeError):
|
|
created_dt = datetime.now(timezone.utc)
|
|
|
|
if created_dt < since_dt:
|
|
continue
|
|
|
|
results.append({
|
|
"id": m.get("name", "").split("/")[-1],
|
|
"content": m.get("content", ""),
|
|
"created_at": created,
|
|
"tags": self._extract_tags(m.get("content", "")),
|
|
"visibility": m.get("visibility", ""),
|
|
})
|
|
|
|
logger.info(
|
|
"Filtered to %d memos since %s", len(results), since[:10]
|
|
)
|
|
return results
|
|
|
|
def list_all_memos_from_range(self, start_date, end_date, page_size=200):
|
|
"""Fetch memos within a date range via Connect RPC."""
|
|
user = self.get_user_id()
|
|
|
|
payload = {"pageSize": page_size}
|
|
|
|
resp = self.session.post(
|
|
f"{self.base_url}/memos.api.v1.MemoService/ListMemos",
|
|
json=payload,
|
|
timeout=15
|
|
)
|
|
resp.raise_for_status()
|
|
data = resp.json()
|
|
|
|
memos = data.get("memos", [])
|
|
logger.info("Memos API OK | fetched %d memos total", len(memos))
|
|
|
|
# Client-side date range filtering
|
|
results = []
|
|
for m in memos:
|
|
created = m.get("createTime", "")
|
|
try:
|
|
created_dt = datetime.fromisoformat(created.replace("Z", "+00:00"))
|
|
except (ValueError, AttributeError):
|
|
continue
|
|
|
|
if created_dt < start_date or created_dt > end_date:
|
|
continue
|
|
|
|
results.append({
|
|
"id": m.get("name", "").split("/")[-1],
|
|
"content": m.get("content", ""),
|
|
"created_at": created,
|
|
"tags": self._extract_tags(m.get("content", "")),
|
|
"visibility": m.get("visibility", ""),
|
|
})
|
|
|
|
logger.info(
|
|
"Filtered to %d memos in range %s - %s",
|
|
len(results), start_date.strftime("%m/%d"), end_date.strftime("%m/%d")
|
|
)
|
|
return results
|
|
|
|
@staticmethod
|
|
def _extract_tags(content):
|
|
"""Extract #tags from memo content."""
|
|
import re
|
|
return re.findall(r"#(\w[\w\-]*)", content)
|