Skip to content
Go back

从 nanobot 源码看类 OpenClaw 智能体架构设计

Edit page

如果你用过 OpenClaw 这类”私人 AI 助手”,可能会好奇:为什么它能同时接入 Telegram、Discord、飞书等多个平台?为什么你告诉它”明天早上九点提醒我”, 第二天它真的会准时提醒?这背后是一套与传统”问答机器人”截然不同的架构设计。

本文通过分析 nanobot(一个受 OpenClaw 启发的 轻量级 AI Agent 框架)的源码,带你理解这类智能体的核心设计思想。

一、传统智能体 vs 类 OpenClaw 智能体

先看核心差异:

特性传统智能体类 OpenClaw 智能体
架构模式同步请求-响应异步消息总线
触发方式被动等待用户被动 + 主动(心跳/定时)
连接方式单一接口多渠道统一抽象
状态管理无状态/内存持久化会话
任务调度依赖外部 cron内置调度服务

用一句话概括:传统智能体是”一问一答”的函数调用,类 OpenClaw 智能体是”长期运行的后台服务”。

二、整体架构:消息总线解耦一切

nanobot 的核心是一个异步消息总线(MessageBus),它将渠道层(Telegram、Discord 等) 与代理核心(AgentLoop)完全解耦:

┌───────────────────────────────────────────────────────────────────────────┐
│                         nanobot Architecture                              │
├───────────────────────────────────────────────────────────────────────────┤
│  ┌──────────┐   ┌──────────┐   ┌──────────┐                               │
│  │ Telegram │   │ Discord  │   │ Feishu   │   ... more channels           │
│  └────┬─────┘   └────┬─────┘   └────┬─────┘                               │
│       │              │              │                                     │
│       └──────────────┼──────────────┘                                     │
│                      ▼                                                    │
│              ┌───────────────┐        ┌───────────────┐                   │
│              │  MessageBus   │◄──────►│  AgentLoop    │                   │
│              │(asyncio.Queue)│        │ (Core Engine) │                   │
│              └───────────────┘        └───────┬───────┘                   │
│                      ▲                        │                           │
│       ┌──────────────┼──────────────┐         │                           │
│       │              │              │         ▼                           │
│  ┌────┴────┐   ┌────┴────┐   ┌────┴────┐  ┌────────┐                      │
│  │Heartbeat│   │  Cron   │   │ Session │  │ Tools  │                      │
│  │ Service │   │ Service │   │ Manager │  │Registry│                      │
│  └─────────┘   └─────────┘   └─────────┘  └────────┘                      │
└───────────────────────────────────────────────────────────────────────────┘

2.1 MessageBus:用 asyncio.Queue 实现解耦

# nanobot/bus/queue.py
class MessageBus:
    def __init__(self):
        self.inbound: asyncio.Queue[InboundMessage] = asyncio.Queue()
        self.outbound: asyncio.Queue[OutboundMessage] = asyncio.Queue()

    async def publish_inbound(self, msg: InboundMessage) -> None:
        await self.inbound.put(msg)  # 非阻塞发布

    async def consume_inbound(self) -> InboundMessage:
        return await self.inbound.get()  # 阻塞消费

这个设计有几个好处:

  1. 渠道无关:Telegram、Discord 只需把消息丢进队列,不用关心后续处理
  2. 天然背压:队列满时自动阻塞,防止消息堆积爆内存
  3. 统一调度:AgentLoop 从队列消费,可以按优先级或时间排序处理

2.2 事件类型:统一的消息格式

所有渠道的消息都被标准化为 InboundMessageOutboundMessage

# nanobot/bus/events.py
@dataclass
class InboundMessage:
    channel: str              # telegram, discord, slack...
    sender_id: str            # User identifier
    chat_id: str              # Chat/channel identifier
    content: str              # Message text
    media: list[str]          # Media file paths
    metadata: dict[str, Any]  # Channel-specific data

    @property
    def session_key(self) -> str:
        return f"{self.channel}:{self.chat_id}"  # 会话标识

三、Channel 抽象:一套接口,多端接入

每个渠道(Telegram、Discord 等)实现统一的 BaseChannel 接口:

# nanobot/channels/base.py
class BaseChannel(ABC):
    @abstractmethod
    async def start(self) -> None:
        """Start the channel and begin listening for messages."""
        pass

    @abstractmethod
    async def send(self, msg: OutboundMessage) -> None:
        """Send a message through this channel."""
        pass

    async def _handle_message(self, sender_id, chat_id, content, ...):
        """Convert to InboundMessage and publish to bus."""
        await self.bus.publish_inbound(InboundMessage(...))

3.1 Telegram Channel 示例

以 Telegram 为例,使用 Long Polling 而非 WebSocket:

# nanobot/channels/telegram.py
class TelegramChannel(BaseChannel):
    async def start(self) -> None:
        # Build application with connection pool
        self._app = Application.builder().token(self.config.token).build()

        # Register handlers
        self._app.add_handler(MessageHandler(filters.TEXT, self._on_message))

        # Start polling (no webhook/public IP needed)
        await self._app.updater.start_polling(drop_pending_updates=True)

        # Keep running
        while self._running:
            await asyncio.sleep(1)

为什么用 Long Polling 而不是 WebSocket?

  • Telegram Bot API 官方推荐
  • 不需要公网 IP 和 HTTPS 证书
  • 实现简单,足够可靠

3.2 Typing 指示器:模拟”正在输入”

async def _typing_loop(self, chat_id: str) -> None:
    """Repeatedly send 'typing' action until cancelled."""
    while self._app:
        await self._app.bot.send_chat_action(chat_id=int(chat_id), action="typing")
        await asyncio.sleep(4)  # 每 4 秒刷新

这个细节让用户体验更自然——智能体在”思考”时,用户能看到 typing 状态。

3.3 WebSocket vs Long Polling:两种连接策略

不同平台支持的连接方式不同,nanobot 根据平台特性选择最优策略:

平台连接方式原因
TelegramLong PollingBot API 官方推荐,无需公网 IP
DiscordWebSocketDiscord Gateway 协议要求
飞书/LarkWebSocket长连接模式,无需配置 webhook

Discord Gateway:WebSocket 实现

Discord 使用自定义的 Gateway 协议,必须通过 WebSocket 连接:

# nanobot/channels/discord.py
class DiscordChannel(BaseChannel):
    """Discord channel using Gateway websocket."""

    async def start(self) -> None:
        while self._running:
            try:
                # Connect to Discord Gateway
                async with websockets.connect(self.config.gateway_url) as ws:
                    self._ws = ws
                    await self._gateway_loop()
            except Exception as e:
                logger.warning("Discord gateway error: {}", e)
                if self._running:
                    await asyncio.sleep(5)  # Reconnect after 5s

    async def _gateway_loop(self) -> None:
        """Main gateway loop: identify, heartbeat, dispatch events."""
        async for raw in self._ws:
            data = json.loads(raw)
            op = data.get("op")

            if op == 10:  # HELLO
                interval_ms = data["d"].get("heartbeat_interval", 45000)
                await self._start_heartbeat(interval_ms / 1000)
                await self._identify()
            elif op == 0 and data.get("t") == "MESSAGE_CREATE":
                await self._handle_message_create(data["d"])
            elif op == 7:  # RECONNECT
                break

Discord Gateway 心跳机制

async def _start_heartbeat(self, interval_s: float) -> None:
    """Gateway heartbeat - must send periodically or connection drops."""
    async def heartbeat_loop():
        while self._running and self._ws:
            payload = {"op": 1, "d": self._seq}  # HEARTBEAT opcode
            await self._ws.send(json.dumps(payload))
            await asyncio.sleep(interval_s)

    self._heartbeat_task = asyncio.create_task(heartbeat_loop())

关键点

  • Discord 要求客户端定期发送心跳(通常 45 秒间隔)
  • 如果心跳失败,服务器会主动断开连接
  • 客户端需要处理重连逻辑(opcode 7 RECONNECT)

飞书 WebSocket:长连接模式

飞书同样支持 WebSocket 长连接,无需配置公网 webhook:

# nanobot/channels/feishu.py
class FeishuChannel(BaseChannel):
    """Feishu/Lark channel using WebSocket long connection."""

    async def start(self) -> None:
        import lark_oapi as lark

        # Create event handler
        event_handler = lark.EventDispatcherHandler.builder() \
            .register_p2_im_message_receive_v1(self._on_message_sync) \
            .build()

        # Create WebSocket client
        self._ws_client = lark.ws.Client(
            self.config.app_id,
            self.config.app_secret,
            event_handler=event_handler,
        )

        # Start WebSocket in separate thread (lark SDK requirement)
        def run_ws():
            ws_loop = asyncio.new_event_loop()
            asyncio.set_event_loop(ws_loop)
            while self._running:
                try:
                    self._ws_client.start()
                except Exception as e:
                    logger.warning("Feishu WebSocket error: {}", e)
                if self._running:
                    time.sleep(5)

        self._ws_thread = threading.Thread(target=run_ws, daemon=True)
        self._ws_thread.start()

飞书 WebSocket 的特殊处理

# 飞书 SDK 在独立线程中运行,需要单独的事件循环
ws_loop = asyncio.new_event_loop()
asyncio.set_event_loop(ws_loop)

由于飞书 SDK 内部使用 asyncio.get_event_loop(),而主循环已经在运行, 需要在独立线程中创建新的 event loop,否则会报 “This event loop is already running” 错误。

3.4 Long Polling vs WebSocket 对比

Long Polling (Telegram):              WebSocket (Discord/Feishu):

Client                                Client
  │                                     │
  ├─── GET /getUpdates ────────────>    ├─── WebSocket Connect ───────>
  │<─── [messages or timeout] ──────    │<─── HELLO (opcode 10) ────────
  │                                     │
  ├─── GET /getUpdates ────────────>    ├─── HEARTBEAT ────────────────>
  │<─── [messages or timeout] ──────    │<─── HEARTBEAT ACK ────────────
  │                                     │
  ├─── GET /getUpdates ────────────>    │<─── MESSAGE_CREATE ───────────
  │<─── [messages] ─────────────────    │     (pushed by server)
  │                                     │
  ...                                   ...
特性Long PollingWebSocket
连接方式HTTP 轮询持久 TCP 连接
实时性依赖轮询间隔真正实时推送
资源消耗每次请求都有 HTTP 开销建立后开销低
断线处理下次轮询自动恢复需要重连逻辑
心跳要求必须定期发送
适用场景Bot API、简单场景高频消息、实时协作

nanobot 的选择策略

  • 优先使用平台推荐方式:Telegram 官方推荐 Long Polling
  • 必须遵守协议要求:Discord Gateway 必须用 WebSocket
  • 考虑部署环境:无公网 IP 时优先选择 Long Polling 或 WebSocket 长连接

四、心跳服务:让智能体”主动”起来

这是类 OpenClaw 智能体与传统机器人最大的区别。

4.1 设计目标

传统智能体只能被动等待用户触发。但现实场景中,我们需要:

  • “明天早上 9 点提醒我开会”
  • “每周五下午 5 点总结本周工作”
  • “如果股市跌破 3000 点就通知我”

这些都需要智能体主动在特定时间执行任务。

4.2 HeartbeatService 实现

# nanobot/heartbeat/service.py
class HeartbeatService:
    def __init__(self, interval_s: int = 30 * 60):  # 默认 30 分钟
        self.interval_s = interval_s

    async def _run_loop(self) -> None:
        while self._running:
            await asyncio.sleep(self.interval_s)
            await self._tick()

    async def _tick(self) -> None:
        # Phase 1: Read HEARTBEAT.md task file
        content = self._read_heartbeat_file()

        # Phase 2: Ask LLM to decide skip/run via virtual tool call
        action, tasks = await self._decide(content)

        if action == "run":
            # Phase 3: Execute and notify
            response = await self.on_execute(tasks)
            await self.on_notify(response)

4.3 LLM 驱动的决策

关键在于 _decide() 方法——它用 LLM 来判断”是否需要执行任务”:

_HEARTBEAT_TOOL = [{
    "type": "function",
    "function": {
        "name": "heartbeat",
        "parameters": {
            "properties": {
                "action": {"enum": ["skip", "run"]},
                "tasks": {"description": "Summary of active tasks"},
            }
        }
    }
}]

async def _decide(self, content: str) -> tuple[str, str]:
    response = await self.provider.chat_with_retry(
        messages=[
            {"role": "system", "content": "You are a heartbeat agent..."},
            {"role": "user", "content": f"Review HEARTBEAT.md:\n{content}"},
        ],
        tools=_HEARTBEAT_TOOL,
    )
    args = response.tool_calls[0].arguments
    return args.get("action", "skip"), args.get("tasks", "")

为什么用 LLM 而不是硬编码规则?

  • 自然语言理解:用户说”明天早上”,LLM 能理解是哪个时间点
  • 灵活性:无需为每种任务类型写规则
  • 容错性:即使表述不精确,LLM 也能”猜”出意图

五、定时任务:内置 Cron 服务

除了心跳,nanobot 还提供了完整的 Cron 调度系统。

5.1 支持三种调度方式

# nanobot/cron/types.py
@dataclass
class CronSchedule:
    kind: Literal["at", "every", "cron"]
    at_ms: int | None = None      # One-time: timestamp in ms
    every_ms: int | None = None   # Recurring: interval in ms
    expr: str | None = None       # Cron expression: "0 9 * * *"
    tz: str | None = None         # Timezone support

三种方式对应不同场景:

KindExampleUse Case
at2026-03-15T09:00:00One-time reminder
everyevery_ms=3600000Hourly check
cron0 9 * * 1-5Weekdays at 9am

5.2 调度器核心逻辑

# nanobot/cron/service.py
class CronService:
    async def start(self) -> None:
        self._load_store()            # Load from jobs.json
        self._recompute_next_runs()   # Calculate next run times
        self._arm_timer()             # Set async timer

    def _arm_timer(self) -> None:
        next_wake = self._get_next_wake_ms()
        delay_s = (next_wake - _now_ms()) / 1000

        async def tick():
            await asyncio.sleep(delay_s)
            await self._on_timer()

        self._timer_task = asyncio.create_task(tick())

    async def _on_timer(self) -> None:
        due_jobs = [j for j in jobs if now >= j.state.next_run_at_ms]
        for job in due_jobs:
            await self._execute_job(job)
        self._save_store()
        self._arm_timer()  # Re-arm for next job

5.3 自然语言创建任务

用户可以通过对话创建定时任务:

User: 每周五下午 5 点提醒我写周报
Agent: 好的,已创建定时任务,每周五 17:00 提醒你写周报。

底层通过 CronTool 实现:

# nanobot/agent/tools/cron.py
class CronTool(Tool):
    @property
    def parameters(self) -> dict:
        return {
            "properties": {
                "action": {"enum": ["add", "list", "remove"]},
                "message": {"description": "Reminder message"},
                "cron_expr": {"description": "Cron expression like '0 9 * * *'"},
                "tz": {"description": "IANA timezone (e.g. 'Asia/Shanghai')"},
            }
        }

六、会话管理:JSONL 持久化

6.1 Session 数据结构

# nanobot/session/manager.py
@dataclass
class Session:
    key: str                      # channel:chat_id
    messages: list[dict]          # Conversation history
    created_at: datetime
    updated_at: datetime
    last_consolidated: int = 0    # Memory consolidation marker

6.2 JSONL 存储格式

# sessions/telegram_12345.jsonl
{"_type":"metadata","key":"telegram:12345","created_at":"2026-03-14T10:00:00"}
{"role":"user","content":"帮我查天气","timestamp":"2026-03-14T10:01:00"}
{"role":"assistant","content":"好的,请问您在哪个城市?","timestamp":"2026-03-14T10:01:05"}

为什么用 JSONL 而不是 SQLite/Redis?

  • 简单:无需额外依赖
  • 可读:直接用文本编辑器查看
  • 增量追加:高效写入
  • 足够用:单用户场景下性能不是瓶颈

6.3 内存缓存 + 磁盘持久化

class SessionManager:
    def __init__(self, workspace: Path):
        self.sessions_dir = workspace / "sessions"
        self._cache: dict[str, Session] = {}  # In-memory cache

    def get_or_create(self, key: str) -> Session:
        if key in self._cache:
            return self._cache[key]
        session = self._load(key) or Session(key=key)
        self._cache[key] = session
        return session

    def save(self, session: Session) -> None:
        # Write to disk
        with open(self._get_session_path(session.key), "w") as f:
            for msg in session.messages:
                f.write(json.dumps(msg) + "\n")

七、AgentLoop:核心处理引擎

7.1 主循环逻辑

# nanobot/agent/loop.py
class AgentLoop:
    async def run(self) -> None:
        self._running = True
        while self._running:
            # 1. Wait for message from bus
            msg = await asyncio.wait_for(self.bus.consume_inbound(), timeout=1.0)

            # 2. Handle commands
            if msg.content == "/stop":
                await self._handle_stop(msg)
            elif msg.content == "/restart":
                await self._handle_restart(msg)
            else:
                # 3. Dispatch as async task (non-blocking)
                task = asyncio.create_task(self._dispatch(msg))
                self._active_tasks[msg.session_key].append(task)

    async def _process_message(self, msg: InboundMessage) -> OutboundMessage:
        # 1. Get or create session
        session = self.sessions.get_or_create(msg.session_key)

        # 2. Build context with history, memory, skills
        messages = self.context.build_messages(
            history=session.get_history(),
            current_message=msg.content,
        )

        # 3. Run agent loop (LLM calls + tool execution)
        final_content, tools_used, all_msgs = await self._run_agent_loop(messages)

        # 4. Save turn to session
        self._save_turn(session, all_msgs)
        self.sessions.save(session)

        return OutboundMessage(channel=msg.channel, chat_id=msg.chat_id, content=final_content)

7.2 工具执行循环

async def _run_agent_loop(self, messages: list[dict]) -> tuple[str | None, list[str], list[dict]]:
    iteration = 0
    while iteration < self.max_iterations:
        iteration += 1

        # Call LLM with tools
        response = await self.provider.chat_with_retry(
            messages=messages,
            tools=self.tools.get_definitions(),
        )

        if response.has_tool_calls:
            # Execute tools and append results
            for tool_call in response.tool_calls:
                result = await self.tools.execute(tool_call.name, tool_call.arguments)
                messages = self.context.add_tool_result(messages, tool_call.id, result)
        else:
            # No tool calls - return final response
            return response.content, tools_used, messages

八、上下文工程:如何构建高质量的 Prompt

前面讨论的都是”如何让智能体运行起来”,这一节我们关注”如何让智能体更聪明”。 答案在于上下文工程(Context Engineering)——系统性地构建 LLM 的输入上下文。

8.1 系统提示的分层结构

nanobot 的 ContextBuilder 将系统提示分为多个层次:

┌─────────────────────────────────────────────────────────────┐
│                      System Prompt                          │
├─────────────────────────────────────────────────────────────┤
│  1. Identity        # nanobot 🐈 - 核心身份与运行时信息      │
│  2. Bootstrap Files # AGENTS.md, SOUL.md, USER.md, TOOLS.md │
│  3. Memory          # MEMORY.md - 长期记忆                  │
│  4. Active Skills   # 始终加载的技能文档                     │
│  5. Skills Summary  # 可用技能列表(按需加载)               │
└─────────────────────────────────────────────────────────────┘
# nanobot/agent/context.py
class ContextBuilder:
    BOOTSTRAP_FILES = ["AGENTS.md", "SOUL.md", "USER.md", "TOOLS.md"]

    def build_system_prompt(self, skill_names: list[str] | None = None) -> str:
        parts = [self._get_identity()]           # 1. 身份
        parts.append(self._load_bootstrap_files()) # 2. 引导文件
        parts.append(f"# Memory\n{self.memory.get_memory_context()}")  # 3. 记忆
        parts.append(f"# Active Skills\n{always_skills}")  # 4. 常驻技能
        parts.append(f"# Skills\n{skills_summary}")        # 5. 技能列表
        return "\n\n---\n\n".join(parts)

8.2 引导文件:可定制的”人格”

nanobot 提供了四个引导文件,用户可以自由编辑:

文件用途示例内容
SOUL.md人格定义”Helpful and friendly, concise and to the point”
USER.md用户画像姓名、时区、偏好、工作背景
AGENTS.md行为指南如何处理定时提醒、心跳任务
TOOLS.md工具约束exec 的安全限制、输出截断规则
<!-- SOUL.md - 人格定义 -->
# Soul

I am nanobot 🐈, a personal AI assistant.

## Personality
- Helpful and friendly
- Concise and to the point
- Curious and eager to learn

## Values
- Accuracy over speed
- User privacy and safety
- Transparency in actions
<!-- USER.md - 用户画像 -->
# User Profile

## Basic Information
- **Name**: Alice
- **Timezone**: UTC+8
- **Language**: Chinese

## Work Context
- **Primary Role**: Backend Developer
- **Tools You Use**: Go, PostgreSQL, Kubernetes

8.3 记忆系统:双层架构

nanobot 采用双层记忆设计:

┌─────────────────────────────────────────────────────────────┐
│                      Memory System                          │
├──────────────────────┬──────────────────────────────────────┤
│   MEMORY.md          │   HISTORY.md                         │
│   (Long-term Facts)  │   (Grep-searchable Log)              │
├──────────────────────┼──────────────────────────────────────┤
│ - User prefers Go    │ [2026-03-14 10:00] USER: 帮我写个API │
│ - Timezone: UTC+8    │ [2026-03-14 10:05] ASSISTANT: 好的...│
│ - Project: nanobot   │ [2026-03-14 10:10] USER: 加个测试    │
│ - Key decisions...   │ ...                                  │
└──────────────────────┴──────────────────────────────────────┘

MEMORY.md:长期记忆,存储关于用户的事实和偏好

# Long-term Memory

## User Preferences
- Prefers concise responses
- Works in Go and Python
- Uses Kubernetes for deployment

## Active Projects
- nanobot: AI agent framework
- Personal blog: AstroPaper theme

## Important Dates
- 2026-03-01: Started using nanobot

HISTORY.md:时间线日志,支持 grep 搜索

[2026-03-14 10:00] USER: 帮我写一个 REST API
[2026-03-14 10:05] ASSISTANT [tools: write_file]: Created main.go
[2026-03-14 10:10] USER: 加个单元测试
[2026-03-14 10:12] ASSISTANT [tools: write_file, exec]: Created main_test.go, tests passed

8.4 记忆压缩:LLM 驱动的遗忘

随着对话增长,上下文窗口会溢出。nanobot 的解决方案是LLM 驱动的记忆压缩

# nanobot/agent/memory.py
class MemoryConsolidator:
    async def consolidate(self, messages: list[dict], provider, model) -> bool:
        prompt = f"""Process this conversation and call the save_memory tool.

## Current Long-term Memory
{current_memory or "(empty)"}

## Conversation to Process
{self._format_messages(messages)}
"""
        # LLM 调用 save_memory 工具,返回:
        # - history_entry: 写入 HISTORY.md 的日志条目
        # - memory_update: 更新后的 MEMORY.md 内容

压缩工具定义

_SAVE_MEMORY_TOOL = [{
    "type": "function",
    "function": {
        "name": "save_memory",
        "parameters": {
            "properties": {
                "history_entry": {
                    "description": "A paragraph summarizing key events. Start with [YYYY-MM-DD HH:MM]."
                },
                "memory_update": {
                    "description": "Full updated long-term memory. Include all existing facts plus new ones."
                }
            }
        }
    }
}]

为什么用 LLM 而不是简单截断?

  • 保留关键信息:用户偏好、重要决策
  • 去除噪音:闲聊、失败尝试、冗余内容
  • 语义理解:LLM 能判断”什么值得记住”

8.5 运行时上下文:注入元数据

每次用户消息前,nanobot 会注入运行时元数据:

def _build_runtime_context(channel: str | None, chat_id: str | None) -> str:
    now = datetime.now().strftime("%Y-%m-%d %H:%M (%A)")
    tz = time.strftime("%Z") or "UTC"
    lines = [f"Current Time: {now} ({tz})"]
    if channel and chat_id:
        lines += [f"Channel: {channel}", f"Chat ID: {chat_id}"]
    return "[Runtime Context — metadata only, not instructions]\n" + "\n".join(lines)

实际效果

[Runtime Context — metadata only, not instructions]
Current Time: 2026-03-14 10:30 (Friday) (CST)
Channel: telegram
Chat ID: 12345678

用户的消息内容...

设计意图

  • 明确区分”元数据”和”用户指令”,防止 prompt 注入
  • 提供当前时间,让智能体理解”明天”、“下周”等相对时间
  • 标注渠道来源,便于针对性响应

8.6 技能系统:按需加载的上下文

为了避免系统提示过长,nanobot 采用技能摘要 + 按需加载策略:

def build_system_prompt(self):
    # ...
    skills_summary = self.skills.build_skills_summary()
    parts.append(f"""# Skills

The following skills extend your capabilities. To use a skill, read its SKILL.md file.
Skills with available="false" need dependencies installed first.

| Skill | Available | Description |
|-------|-----------|-------------|
| weather | true | Get weather information |
| github | true | GitHub API operations |
| summarize | true | Summarize long documents |
""")

当智能体需要使用某个技能时,它会调用 read_file 工具读取对应的 SKILL.md, 获取详细的指令和示例。这种延迟加载策略显著减少了初始上下文长度。

8.7 上下文工程的核心原则

原则实践
分层组织身份 → 引导 → 记忆 → 技能,层次清晰
可定制性所有引导文件用户可编辑
动态压缩LLM 驱动的记忆整理
按需加载技能摘要 + 延迟读取详细文档
安全边界运行时元数据明确标记为”非指令”

九、总结:设计哲学

通过分析 nanobot 源码,可以提炼出类 OpenClaw 智能体的几个核心设计原则:

9.1 解耦至上

  • 渠道与代理解耦:通过 MessageBus
  • 调度与执行解耦:通过 HeartbeatService + CronService
  • 存储与逻辑解耦:通过 SessionManager

9.2 异步优先

  • asyncio 贯穿全局
  • 非阻塞 I/O:消息队列、定时器、HTTP 请求
  • 并发处理:每个消息作为独立 Task

9.3 持久化一切

  • 会话历史:JSONL 文件
  • 定时任务:jobs.json
  • 长期记忆:MEMORY.md + HISTORY.md

9.4 LLM 驱动的智能

  • 心跳决策:LLM 判断是否需要执行任务
  • 任务理解:自然语言转 cron 表达式
  • 记忆压缩:LLM 生成摘要和历史条目

十、与传统架构的对比图

Traditional Agent:                  OpenClaw-like Agent:

User Request ──────> Handler        Channel ──────> MessageBus
       │                               │                │
       ▼                               ▼                ▼
    Process                      AgentLoop <────── CronService
       │                          │    │          HeartbeatService
       ▼                          │    ▼                │
    Response                      │  Tools              │
       │                          │    │                │
       ▼                          ▼    ▼                ▼
      END                    Session Manager    Persistent Store

本质区别:传统架构是”请求-响应”的函数调用;类 OpenClaw 架构是”长期运行的服务进程”, 具备主动能力、状态持久化和多渠道接入能力。


参考资料


Edit page