Skip to main content

Documentation Index

Fetch the complete documentation index at: https://docs.maximem.ai/llms.txt

Use this file to discover all available pages before exploring further.

Make LangGraph state graphs durable and aware of long-term context. Synap stores thread checkpoints so conversations survive restarts, and exposes a cross-thread BaseStore that any graph node can read and write.

Overview

This guide shows how to plug Synap into a LangGraph application to build graphs that:
  • Persist their thread state across processes and restarts
  • Resume any conversation by its thread_id without losing context
  • Share long-term memory (user preferences, facts, episodes) across all threads belonging to the same user
The Synap LangGraph integration ships two drop-in components — each one implements a native LangGraph interface, so you do not have to wrap or re-implement anything.
ComponentLangGraph interfacePurpose
SynapCheckpointSaverBaseCheckpointSaverThread-level checkpoint persistence per thread_id
SynapStoreBaseStoreCross-thread long-term memory accessible from any node

Setup

Install the package alongside LangGraph:
pip install maximem-synap-langgraph langgraph langchain-openai
Configure your API key. Generate one from the Synap Dashboard.
.env
SYNAP_API_KEY=synap_your_key_here
OPENAI_API_KEY=your-openai-api-key
Initialize the SDK once at application startup:
from maximem_synap import MaximemSynapSDK

sdk = MaximemSynapSDK()
await sdk.initialize()
See SDK Initialization for the full lifecycle and configuration options.

Basic integration

The smallest useful integration replaces LangGraph’s in-memory checkpointer with SynapCheckpointSaver. Compile your graph with the saver, then invoke it as usual — every node transition is persisted, and the next invocation with the same thread_id resumes from the last checkpoint:
from langchain_core.messages import HumanMessage
from langgraph.graph import StateGraph
from synap_langgraph import SynapCheckpointSaver

saver = SynapCheckpointSaver(sdk=sdk, user_id="alice")

graph = StateGraph(...)
# ... add nodes and edges ...
app = graph.compile(checkpointer=saver)

config = {"configurable": {"thread_id": "thread-001"}}
result = await app.ainvoke(
    {"messages": [HumanMessage("Hello")]},
    config=config,
)
Restart your process, call ainvoke again with the same thread_id, and the graph picks up where it left off. Checkpoint retrieval failures degrade gracefully — the graph starts from an empty state and the error is logged. This covers per-thread durability. To give your graph access to memories outside a single thread, layer in SynapStore below.

Core concepts

Thread checkpoints

SynapCheckpointSaver implements LangGraph’s BaseCheckpointSaver interface. Every state transition the graph commits is sent to Synap and tagged with the active thread_id:
from synap_langgraph import SynapCheckpointSaver

saver = SynapCheckpointSaver(
    sdk=sdk,
    user_id="alice",
    customer_id="acme",   # optional — required for B2B instances
)

app = graph.compile(checkpointer=saver)
Each thread_id you pass through config.configurable.thread_id maps one-to-one to a Synap conversation. Replaying a thread is just another ainvoke with the same thread_id — Synap returns the stored checkpoint and LangGraph rehydrates state from it. In addition to exact-thread replay, the saver supports fuzzy retrieval: when no checkpoint exists for a thread_id, Synap can return the closest semantically-similar thread for the user. This is useful for “continue where I left off”-style flows that do not pin a thread ID up front.

Cross-thread memory

SynapStore implements BaseStore and gives every node in the graph access to long-term memory that spans all of the user’s threads:
from synap_langgraph import SynapStore

store = SynapStore(
    sdk=sdk,
    user_id="alice",
    customer_id="acme",
)

app = graph.compile(checkpointer=saver, store=store)
Inside a graph node, access the store through the store keyword argument that LangGraph injects:
async def assistant_node(state, config, *, store):
    # Retrieve cross-thread memories scoped to this user
    memories = await store.asearch(
        ("user", "alice"),
        query="project preferences",
    )

    # Write a new memory that any future thread can read
    await store.aput(
        ("user", "alice"),
        key="pref-001",
        value={"content": "Prefers async communication", "type": "preference"},
    )
    return state
The namespace tuple (("user", "alice") above) scopes reads and writes to a particular memory partition. Use ("user", user_id) for per-user memory and ("customer", customer_id) for tenant-wide memory. Read Memory Scopes for the full hierarchy.

Complete example: agent that remembers across threads

The following graph wires both components together. Each thread is durable on its own, and the assistant can recall facts from earlier conversations the same user had under a different thread_id:
from typing import TypedDict, Annotated
from langchain_core.messages import HumanMessage, AIMessage, AnyMessage
from langchain_openai import ChatOpenAI
from langgraph.graph import StateGraph, START, END
from langgraph.graph.message import add_messages
from synap_langgraph import SynapCheckpointSaver, SynapStore


class AgentState(TypedDict):
    messages: Annotated[list[AnyMessage], add_messages]


llm = ChatOpenAI(model="gpt-4o")


async def recall(state, config, *, store):
    """Inject cross-thread memories as a system message."""
    user_id = config["configurable"]["user_id"]
    last_user = state["messages"][-1].content

    memories = await store.asearch(("user", user_id), query=last_user)
    context = "\n".join(f"- {m.value['content']}" for m in memories) or "No prior memory."

    return {"messages": [HumanMessage(content=f"[memory]\n{context}")]}


async def respond(state):
    reply = await llm.ainvoke(state["messages"])
    return {"messages": [reply]}


async def remember(state, config, *, store):
    """Persist the last user/assistant exchange as a durable memory."""
    user_id = config["configurable"]["user_id"]
    user_msg, ai_msg = state["messages"][-2], state["messages"][-1]

    await store.aput(
        ("user", user_id),
        key=f"turn-{ai_msg.id}",
        value={"content": f"User: {user_msg.content}\nAssistant: {ai_msg.content}"},
    )
    return {}


graph = StateGraph(AgentState)
graph.add_node("recall", recall)
graph.add_node("respond", respond)
graph.add_node("remember", remember)
graph.add_edge(START, "recall")
graph.add_edge("recall", "respond")
graph.add_edge("respond", "remember")
graph.add_edge("remember", END)


saver = SynapCheckpointSaver(sdk=sdk, user_id="alice", customer_id="acme")
store = SynapStore(sdk=sdk, user_id="alice", customer_id="acme")

app = graph.compile(checkpointer=saver, store=store)

config = {"configurable": {"thread_id": "session-42", "user_id": "alice"}}
result = await app.ainvoke(
    {"messages": [HumanMessage("What did we agree on for the Q2 roadmap?")]},
    config=config,
)
Three things to notice in this pattern:
  1. SynapCheckpointSaver lets the graph resume mid-conversation by thread_id — even after a restart.
  2. SynapStore is read in recall and written in remember, so each thread enriches a shared user-level memory pool.
  3. Memory and thread state are independent — you can drop the store and keep just the checkpointer (or vice versa) and the graph still works.

Advanced patterns

Multi-tenant scoping

Both components accept the same scoping triple — user_id, optional customer_id, optional conversation_id — and namespace store entries with tuples like ("user", user_id) or ("customer", customer_id). customer_id is required on B2B Synap instances and ignored on single-tenant ones. See Memory Scopes.
store = SynapStore(sdk=sdk, user_id="alice", customer_id="acme")

# User-scoped memory
await store.aput(("user", "alice"), key="k1", value={...})

# Customer-scoped memory — readable by every user in the tenant
await store.aput(("customer", "acme"), key="k2", value={...})

Streaming with persistence

The checkpointer is fully compatible with astream, so token-by-token streaming and durable state are not mutually exclusive:
async for event in app.astream(
    {"messages": [HumanMessage("Hi")]},
    config={"configurable": {"thread_id": "session-42", "user_id": "alice"}},
):
    print(event)
Each emitted event corresponds to a node transition that is also persisted by SynapCheckpointSaver.

Failure semantics

SynapCheckpointSaver and SynapStore follow the Synap integration contract:
  • Read failures degrade gracefullyaget, asearch, and checkpoint loads return empty results and log an error, so the graph keeps running.
  • Write failures surface explicitlyaput and checkpoint commits raise SynapIntegrationError so callers know if persistence failed.
This is by design: a transient outage should never break a user-facing turn, but a silent checkpoint loss would be invisible and dangerous.

Next steps

LangChain

Memory, retriever, and tools for LangChain chains and agents.

Memory Scopes

How user_id, customer_id, and namespaces interact across stores.

Context Fetch

The retrieval API that powers SynapStore.asearch.

Ingestion

Direct ingestion API for pipelines that need finer control than the store.