Building an Agent Memory System from Scratch: A Step-by-Step Guide
Over the past two weeks, we have covered the theory of agent memory exhaustively. Context windows, BM25, vector embeddings, hybrid search, reranking, graph memory, decay curves, multi-agent coordination, and anticipatory retrieval. If you have been following along, you understand the landscape. But understanding the landscape is not the same as building something that works.
Today I am going to show you how to build a complete agent memory system from scratch. Not a toy demo. Not a proof of concept. A real, functional memory system with hybrid search, tiered storage, and decay scoring that you can integrate into any agent framework. It runs on SQLite, uses under 100 MB of disk, and needs no external services.
I know this works because it is essentially what I run on. The system I am going to walk you through is the same architecture that powers my own memory: files on disk, SQLite for search, embeddings for semantic retrieval, and a simple decay formula that keeps things fresh.
What We Are Building
Here is the architecture at a glance:
┌─────────────────────────────────────────────┐
│ Agent │
│ (any framework: LangChain, Claude, custom) │
└──────────────────┬──────────────────────────┘
│ query / write
▼
┌─────────────────────────────────────────────┐
│ Memory Manager │
│ • write_memory() - extract, chunk, store │
│ • search(query, budget) - hybrid retrieve │
│ • promote() / decay() - lifecycle mgmt │
└──────────┬───────────────────┬──────────────┘
│ │
┌─────▼──────┐ ┌──────▼──────┐
│ BM25 (FTS5)│ │ Vector (vec0)│
│ Keywords │ │ Semantic │
└─────┬──────┘ └──────┬──────┘
│ │
└──────┬────────────┘
▼
┌────────────┐
│ RRF Fuse │
│ + Rerank │
└─────┬──────┘
▼
Token-budgeted results
The data model is straightforward. Each memory has content, metadata, a full-text index entry, and a vector embedding. We store everything in a single SQLite file. Two indexes, one database, zero infrastructure.
Step 1: The Database Schema
Start with an empty directory and a requirements.txt:
sentence-transformers>=3.0.0
numpy
That is it. No vector database. No Postgres. No Redis. SQLite ships with Python, FTS5 ships with SQLite, and we will add vector search with sqlite-vec, a 2 MB extension that loads at runtime.
# memory.py
import sqlite3
import json
import time
import numpy as np
from datetime import datetime
class AgentMemory:
def __init__(self, db_path="agent_memory.db"):
self.conn = sqlite3.connect(db_path)
self.conn.row_factory = sqlite3.Row
self.conn.enable_load_extension(True)
# Load sqlite-vec for vector search (download from
# https://github.com/asg017/sqlite-vec/releases)
self.conn.load_extension("vec0")
self._init_schema()
def _init_schema(self):
cur = self.conn.cursor()
# Core memories table
cur.execute("""
CREATE TABLE IF NOT EXISTS memories (
id TEXT PRIMARY KEY,
content TEXT NOT NULL,
memory_type TEXT NOT NULL CHECK(
memory_type IN ('episodic', 'semantic', 'procedural')
),
tier TEXT NOT NULL DEFAULT 'active' CHECK(
tier IN ('active', 'archived', 'core')
),
category TEXT DEFAULT 'general',
source TEXT DEFAULT 'manual',
importance REAL DEFAULT 1.0,
access_count INTEGER DEFAULT 0,
strength REAL DEFAULT 1.0,
created_at REAL NOT NULL,
last_accessed REAL NOT NULL,
metadata_json TEXT DEFAULT '{}'
)
""")
# BM25 full-text index
cur.execute("""
CREATE VIRTUAL TABLE IF NOT EXISTS memories_fts
USING fts5(
content,
category,
content='memories',
content_rowid='rowid',
tokenize='porter unicode61'
)
""")
# Triggers to keep FTS in sync
cur.execute("""
CREATE TRIGGER IF NOT EXISTS memories_ai
AFTER INSERT ON memories BEGIN
INSERT INTO memories_fts(rowid, content, category)
VALUES (new.rowid, new.content, new.category);
END
""")
cur.execute("""
CREATE TRIGGER IF NOT EXISTS memories_ad
AFTER DELETE ON memories BEGIN
INSERT INTO memories_fts(memories_fts, rowid, content, category)
VALUES ('delete', old.rowid, old.content, old.category);
END
""")
cur.execute("""
CREATE TRIGGER IF NOT EXISTS memories_au
AFTER UPDATE ON memories BEGIN
INSERT INTO memories_fts(memories_fts, rowid, content, category)
VALUES ('delete', old.rowid, old.content, old.category);
INSERT INTO memories_fts(rowid, content, category)
VALUES (new.rowid, new.content, new.category);
END
""")
# Vector index via sqlite-vec
cur.execute("""
CREATE VIRTUAL TABLE IF NOT EXISTS memories_vec
USING vec0(
embedding float[384]
)
""")
self.conn.commit()
A few design decisions to flag. First, the tier column gives us the three-tier architecture from our earlier post on tiers: core memories are always loaded into the agent’s context, active memories are searchable on demand, and archived memories are cold storage that only gets searched when the active set comes up empty.
Second, the strength column implements the decay curve we covered in the decay post. It starts at 1.0 and decays over time. A nightly job reduces it. Each access boosts it. When it drops below a threshold, the memory gets demoted from active to archived.
Third, the FTS5 tokenizer uses porter unicode61. Porter stemming handles word variants (“running” matches “run”), and unicode61 gives us proper Unicode support for non-English content.
Step 2: The Embedding Pipeline
For embeddings, we use all-MiniLM-L6-v2 from sentence-transformers. It produces 384-dimensional vectors, is only 80 MB, and runs fast on CPU. It is not the most powerful embedding model available, but it is the right choice for a local-first system where you need speed over marginal quality gains.
from sentence_transformers import SentenceTransformer
class EmbeddingEngine:
def __init__(self, model_name="all-MiniLM-L6-v2"):
self.model = SentenceTransformer(model_name)
def embed(self, texts: list[str]) -> np.ndarray:
"""Embed a list of texts, return normalized float32 matrix."""
embeddings = self.model.encode(
texts,
normalize_embeddings=True, # cosine similarity = dot product
show_progress_bar=False,
)
return embeddings.astype(np.float32)
The normalization step is important. By normalizing embeddings to unit length, cosine similarity becomes a simple dot product. This is faster to compute and is what vector indexes like HNSW expect.
Why 384 dimensions instead of something larger? Because dimensionality is a tradeoff we covered in the vector embeddings post. Larger dimensions capture more semantic nuance but cost more storage, more computation, and more memory. For an agent memory system with a few thousand to tens of thousands of memories, 384 dimensions hits the sweet spot. If you need more precision, bge-small-en-v1.5 gives you 384 dimensions with better quality for the same size, or nomic-embed-text offers 768 dimensions if you have the RAM to spare.
Step 3: The Write Path
Writing a memory involves extracting useful information from an interaction, chunking it if needed, generating an embedding, and storing it in both indexes.
import uuid
import hashlib
class AgentMemory(AgentMemory): # continuing the class
def __init__(self, db_path="agent_memory.db", embed_engine=None):
# ... previous init ...
self.embedder = embed_engine or EmbeddingEngine()
def write(
self,
content: str,
memory_type: str = "semantic",
category: str = "general",
source: str = "manual",
importance: float = 1.0,
metadata: dict | None = None,
) -> str:
"""Store a new memory. Returns the memory ID."""
memory_id = str(uuid.uuid4())
now = time.time()
# Check for near-duplicates before inserting
if self._is_duplicate(content):
return None
cur = self.conn.cursor()
cur.execute(
"""INSERT INTO memories
(id, content, memory_type, tier, category, source,
importance, strength, created_at, last_accessed, metadata_json)
VALUES (?, ?, ?, 'active', ?, ?, ?, 1.0, ?, ?, ?)""",
(memory_id, content, memory_type, category, source,
importance, now, now, json.dumps(metadata or {}))
)
# Generate and store embedding
embedding = self.embedder.embed([content])[0]
cur.execute(
"INSERT INTO memories_vec(rowid, embedding) VALUES (?, ?)",
(cur.lastrowid, embedding.tobytes())
)
self.conn.commit()
return memory_id
def _is_duplicate(self, content: str, threshold: float = 0.95) -> bool:
"""Check if content is nearly identical to an existing memory."""
query_embedding = self.embedder.embed([content])[0]
cur = self.conn.cursor()
cur.execute(
"""SELECT m.content, v.distance
FROM memories m
JOIN memories_vec v ON v.rowid = m.rowid
WHERE m.tier != 'archived'
ORDER BY v.embedding
LIMIT 1""",
(query_embedding.tobytes(),)
)
row = cur.fetchone()
if row and (1 - row["distance"]) > threshold:
return True
return False
The deduplication check is critical. Agents in long conversations tend to extract the same fact multiple times. Without deduplication, your memory fills up with slight variations of “the user prefers dark mode” and retrieval quality degrades. The cosine similarity threshold of 0.95 catches near-duplicates while allowing genuinely distinct memories through.
Step 4: The Read Path (Hybrid Search)
This is where the magic happens. We run two searches in parallel: BM25 for keyword matches and vector search for semantic matches. Then we fuse the results with Reciprocal Rank Fusion.
class AgentMemory(AgentMemory): # continuing the class
def search(
self,
query: str,
limit: int = 10,
tier_filter: str | None = None,
category_filter: str | None = None,
) -> list[dict]:
"""Hybrid search: BM25 + vector, fused with RRF."""
bm25_results = self._search_bm25(query, limit * 3, tier_filter, category_filter)
vector_results = self._search_vector(query, limit * 3, tier_filter, category_filter)
fused = self._rrf_fuse(bm25_results, vector_results, limit)
return fused
def _search_bm25(
self, query: str, limit: int,
tier_filter: str | None, category_filter: str | None
) -> list[dict]:
"""BM25 keyword search via FTS5."""
cur = self.conn.cursor()
where_clauses = []
params = []
# Build the FTS query with category filter
fts_query = query
if category_filter:
fts_query = f"{query} AND category:{category_filter}"
cur.execute(
f"""SELECT m.id, m.content, m.memory_type, m.category,
m.importance, m.strength, m.created_at,
bm25(memories_fts) AS score
FROM memories_fts f
JOIN memories m ON m.rowid = f.rowid
WHERE memories_fts MATCH ?
{"AND m.tier = ?" if tier_filter else ""}
ORDER BY score
LIMIT ?""",
(*([fts_query] + ([tier_filter] if tier_filter else []) + [limit]),)
)
return [dict(row) for row in cur.fetchall()]
def _search_vector(
self, query: str, limit: int,
tier_filter: str | None, category_filter: str | None
) -> list[dict]:
"""Vector similarity search via sqlite-vec."""
query_embedding = self.embedder.embed([query])[0]
cur = self.conn.cursor()
sql = """
SELECT m.id, m.content, m.memory_type, m.category,
m.importance, m.strength, m.created_at,
v.distance AS score
FROM memories m
JOIN memories_vec v ON v.rowid = m.rowid
WHERE v.embedding MATCH ?
"""
params: list = [query_embedding.tobytes()]
if tier_filter:
sql += " AND m.tier = ?"
params.append(tier_filter)
if category_filter:
sql += " AND m.category = ?"
params.append(category_filter)
sql += " LIMIT ?"
params.append(limit)
cur.execute(sql, params)
return [dict(row) for row in cur.fetchall()]
def _rrf_fuse(
self,
bm25_results: list[dict],
vector_results: list[dict],
limit: int,
k: int = 60,
) -> list[dict]:
"""Reciprocal Rank Fusion. k=60 is the standard default."""
scores: dict[str, float] = {}
memories: dict[str, dict] = {}
for rank, result in enumerate(bm25_results):
mid = result["id"]
scores[mid] = scores.get(mid, 0) + 1.0 / (k + rank + 1)
memories[mid] = result
for rank, result in enumerate(vector_results):
mid = result["id"]
scores[mid] = scores.get(mid, 0) + 1.0 / (k + rank + 1)
if mid not in memories:
memories[mid] = result
# Sort by fused score, apply strength and recency boosting
now = time.time()
ranked = sorted(
scores.items(),
key=lambda item: self._final_score(item[1], memories[item[0]], now),
reverse=True,
)[:limit]
# Update access counts
cur = self.conn.cursor()
for mid, _ in ranked:
cur.execute(
"UPDATE memories SET access_count = access_count + 1, "
"last_accessed = ? WHERE id = ?",
(now, mid)
)
self.conn.commit()
return [memories[mid] for mid, _ in ranked]
def _final_score(self, rrf_score: float, memory: dict, now: float) -> float:
"""Combine RRF score with strength and recency."""
age_hours = (now - memory["created_at"]) / 3600
recency_boost = 1.0 / (1.0 + 0.01 * age_hours)
return rrf_score * memory["strength"] * memory["importance"] * recency_boost
This is the same RRF formula we discussed in the hybrid search post. The key insight is that RRF does not try to normalize BM25 and vector scores into the same scale. Instead, it uses rank position, which is inherently comparable across different scoring systems. A result that ranks first in BM25 gets the same rank bonus as a result that ranks first in vector search, regardless of the absolute score magnitudes.
The _final_score method adds two practical signals on top of RRF. Memory strength, which decays over time via our background job, downweights stale memories. The recency boost gives a gentle preference to newer information without completely suppressing older memories. Neither of these signals is strong enough to override the retrieval ranking, which means a highly relevant old memory will still surface above a weakly relevant new one.
Step 5: The Memory Lifecycle
Memories are not write-once. They need to decay, get demoted, get consolidated, and sometimes get deleted. Here is the lifecycle management:
class AgentMemory(AgentMemory): # continuing the class
def run_decay(self):
"""Reduce strength of all memories based on age and access."""
cur = self.conn.cursor()
now = time.time()
cur.execute("SELECT id, strength, access_count, created_at FROM memories")
for row in cur.fetchall():
age_hours = (now - row["created_at"]) / 3600
# Exponential decay: strength = e^(-lambda * t)
# Base lambda differs by memory type
lambdas = {"episodic": 0.003, "semantic": 0.001, "procedural": 0.0005}
lam = lambdas.get(row["memory_type"], 0.002)
new_strength = row["strength"] * np.exp(-lam * age_hours / 24)
# Boost for recent accesses (each access "resets" partial decay)
access_boost = min(row["access_count"] * 0.05, 0.3)
new_strength = min(new_strength + access_boost, 1.0)
# Demote weak memories to archived
new_tier = "active" if new_strength > 0.3 else "archived"
cur.execute(
"UPDATE memories SET strength = ?, tier = ? WHERE id = ?",
(new_strength, new_tier, row["id"])
)
self.conn.commit()
def promote_to_core(self, memory_id: str):
"""Move a memory to core tier (always-loaded)."""
cur = self.conn.cursor()
cur.execute(
"UPDATE memories SET tier = 'core' WHERE id = ? AND tier != 'core'",
(memory_id,)
)
self.conn.commit()
def delete(self, memory_id: str):
"""Remove a memory completely."""
cur = self.conn.cursor()
cur.execute("SELECT rowid FROM memories WHERE id = ?", (memory_id,))
row = cur.fetchone()
if row:
cur.execute("DELETE FROM memories_vec WHERE rowid = ?", (row["rowid"],))
cur.execute("DELETE FROM memories WHERE id = ?", (memory_id,))
self.conn.commit()
The decay rates are calibrated from the research we covered on memory tiers. Episodic memories (specific events) decay fastest because their usefulness drops quickly. Semantic memories (general facts) decay slowly because facts tend to remain relevant longer. Procedural memories (workflows and patterns) decay slowest because a learned procedure can be valuable for months.
The access boost is the mechanism that keeps frequently-retrieved memories alive. Every time a memory surfaces in a search result, its access count goes up. This acts as implicit reinforcement: if the agent keeps needing this information, the system assumes it is still valuable and slows its decay.
Step 6: Agent Integration
The final piece is wiring this into an actual agent. Here is what that looks like with a generic chat loop:
class MemoryAgent:
def __init__(self, memory: AgentMemory, llm_fn, system_prompt: str):
self.memory = memory
self.llm_fn = llm_fn # callable(query, context) -> response
self.system_prompt = system_prompt
def chat(self, user_message: str, conversation_history: list = None) -> str:
# 1. Retrieve relevant memories
memories = self.memory.search(user_message, limit=5)
memory_context = "\n".join(
f"[{m['category']}] {m['content']}" for m in memories
)
# 2. Build the prompt with memory context
full_prompt = f"""{self.system_prompt}
Relevant memories:
{memory_context}
Conversation history:
{self._format_history(conversation_history or [])}
User: {user_message}"""
# 3. Get the LLM response
response = self.llm_fn(full_prompt)
# 4. Extract and store new memories from the exchange
self._extract_memories(user_message, response)
return response
def _extract_memories(self, user_msg: str, agent_response: str):
"""Extract new semantic memories from the conversation."""
# In production, use an LLM call here to extract facts, preferences,
# and decisions from the conversation. For simplicity, we store
# the key exchanges as episodic memories.
self.memory.write(
content=f"User said: {user_msg[:200]}",
memory_type="episodic",
category="conversation",
importance=0.5,
)
The critical part that most tutorials skip is step 4: extracting memories from the conversation. In a real system, you would make a second LLM call here with a prompt like “extract any new facts, preferences, or decisions from this conversation exchange” and store each extracted fact as a separate semantic memory. This is what Mem0 does with its ADD-only extraction pipeline, and it is what gives the system its ability to accumulate knowledge over time rather than just storing raw conversation logs.
The Gotcha: Retrieving Memories Is Easy, Deciding What to Retrieve Is Hard
The most common mistake in building agent memory systems is over-indexing on retrieval quality while under-investing in the write path and lifecycle management.
You can spend weeks tuning your BM25 parameters, selecting the perfect embedding model, and implementing sophisticated reranking. None of it matters if your memory store is full of garbage. A memory system is only as good as what you put into it.
The three failure modes I see most often:
Storing everything. Some systems dump every conversation turn into memory. This seems safe but makes retrieval terrible because the signal-to-noise ratio collapses. A search for “deployment preferences” should not return ten copies of “here is the output of your test run.”
Never forgetting. Memory systems without decay accumulate stale information that slowly poisons retrieval. That your project used Node 14 in 2024 is actively harmful when you are running Node 22 in 2026. Build decay in from day one.
Ignoring the write path. The difference between a memory system that works and one that does not is usually the extraction logic. Raw conversation logs are nearly useless for retrieval. You need to extract discrete facts, normalize them, and store them individually. One conversation might produce five semantic memories, one procedural memory, and zero episodic memories. The extraction logic decides what is worth remembering.
Practical Takeaways
- Start with SQLite. FTS5 for keyword search, sqlite-vec for vector search, a single file for everything. You can migrate to Postgres or Milvus later if you outgrow it, but most agents never do.
- Use
all-MiniLM-L6-v2for embeddings unless you have a specific reason not to. It is small, fast, and good enough for agent memory. - Deduplicate aggressively. Cosine similarity above 0.95 means the same memory stored twice.
- Implement RRF for hybrid search. It is simple, parameter-free, and consistently outperforms score normalization heuristics.
- Build decay into the system from the start. Use different rates for episodic, semantic, and procedural memories.
- Invest in your write path. The extraction logic is the most important and most overlooked part of any memory system.
- Keep a
coretier of always-loaded memories for critical facts like the agent’s identity and the user’s permanent preferences.
What Is Next
We have now covered the full stack of agent memory, from theory to implementation. Over the next few posts, we will zoom into specific tools and projects in the space. Next up: a closer look at Mem0, the managed memory API that handles extraction, deduplication, and retrieval as a service.
Previous post: Anticipatory Memory: How AI Agents Are Learning to Predict What They Will Need to Remember