Skip to content
Gary Wu
Go back

Multi-Machine AI Agent Deployment

Edit page

Org Status: 🟒 Active Cloudflare: N/A Last Audited: 2026-04-28


You have AI agents running in the cloud. You have local machines with GPUs, screens, and shells. The cloud can reason but cannot act. The machines can act but cannot coordinate. The missing piece is a deployment and coordination layer that registers machines, discovers capabilities, and routes work to the right node β€” installed with a single command.

This article covers the complete architecture for deploying a distributed AI agent platform across multiple machines, from the install script that bootstraps a new node, through the registration protocol that advertises its capabilities, to the pipeline system that chains nodes into workflows.

What you’ll learn:


  1. The Problem
  2. Architecture Overview
  3. Core Concepts
  4. The Install Script
  5. Machine Registration Protocol
  6. Capability Discovery
  7. Node / Hub / Chain Architecture
  8. Security
  9. Patterns
  10. Small Examples
  11. Real-World Multi-Machine AI Systems
  12. Comparisons
  13. Anti-Patterns
  14. References

Building a distributed AI agent platform means solving three problems simultaneously:

1. Installation is fragmented. You have a cloud control plane (Cloudflare Workers, Durable Objects), local machine connectors (TypeScript daemon over WebSocket), a macOS HUD app, and various worker processes. Each one has its own install procedure, its own configuration, its own auth setup. Adding a new machine means 20 minutes of manual steps that nobody documents.

2. Machines don’t know about each other. Machine A has a GPU and can run inference. Machine B has a screen and can display status. Machine C has filesystem access and can execute code. But there is no registry, no discovery, no way for the cloud coordinator to say β€œI need a machine with GPU capability” and get routed to the right one.

3. Pipelines are hardcoded. When you need machine A to run inference, machine B to validate the output, and machine C to deploy the result, you wire it up manually. There is no abstraction for β€œchain these three capabilities together, routing each step to whichever machine can handle it.”

These problems compound. Without automated installation, you can’t easily add machines. Without capability discovery, you can’t route work. Without pipeline abstractions, you can’t compose multi-step workflows. The result is a system that works on one machine but collapses the moment you try to scale to two.

What changes if you get this right

A new machine joins your agent network with a single command:

curl -sfL https://install.atlas.dev | sh -s -- \
  --role node \
  --hub https://api-mom.garywu.dev \
  --token "$ATLAS_TOKEN"

Within 30 seconds, the machine has:

Other machines in the network immediately see the new node and can route tasks to it. The cloud coordinator’s capability map updates in real time. Pipelines that were waiting for a β€œgpu-inference” capability suddenly have a machine to run on.


The system has three layers:

β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚  CLOUD LAYER (Cloudflare Workers + Durable Objects)             β”‚
β”‚                                                                 β”‚
β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”‚
β”‚  β”‚ Atlas Center  β”‚  β”‚ Board Agents β”‚  β”‚ Pipeline Engine        β”‚ β”‚
β”‚  β”‚ (Registry DO) β”‚  β”‚ (C-suite DOs)β”‚  β”‚ (Workflow Orchestrator)β”‚ β”‚
β”‚  β”‚               β”‚  β”‚              β”‚  β”‚                        β”‚ β”‚
β”‚  β”‚ - machine map β”‚  β”‚ - perceive() β”‚  β”‚ - DAG execution        β”‚ β”‚
β”‚  β”‚ - capability  β”‚  β”‚ - reason()   β”‚  β”‚ - step routing         β”‚ β”‚
β”‚  β”‚   index       β”‚  β”‚ - dispatch() β”‚  β”‚ - retry/checkpoint     β”‚ β”‚
β”‚  β”‚ - presence    β”‚  β”‚              β”‚  β”‚                        β”‚ β”‚
β”‚  β””β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”˜  β””β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”˜  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β”‚
β”‚         β”‚                 β”‚                     β”‚               β”‚
β”‚         β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”˜β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜               β”‚
β”‚                       β”‚                                         β”‚
β”‚               β”Œβ”€β”€β”€β”€β”€β”€β”€β”΄β”€β”€β”€β”€β”€β”€β”€β”€β”                                β”‚
β”‚               β”‚   API Mom      β”‚  ← central proxy/registry      β”‚
β”‚               β”‚ (cost routing) β”‚                                β”‚
β”‚               β””β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”˜                                β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”Όβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
                        β”‚
               ═══ WebSocket ═══  (persistent, bidirectional)
                        β”‚
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”Όβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚  LOCAL LAYER (per machine)                                      β”‚
β”‚               β”Œβ”€β”€β”€β”€β”€β”€β”€β”΄β”€β”€β”€β”€β”€β”€β”€β”€β”                                β”‚
β”‚               β”‚  Connector     β”‚  ← installed by install script β”‚
β”‚               β”‚                β”‚                                β”‚
β”‚               β”‚ - register()   β”‚  β†’ POST /v1/machines/register  β”‚
β”‚               β”‚ - heartbeat()  β”‚  β†’ POST /v1/machines/heartbeat β”‚
β”‚               β”‚ - capabilities β”‚  β†’ hud-notify, gpu-inference,  β”‚
β”‚               β”‚ - ws handler   β”‚    code-exec, macos-notify...  β”‚
β”‚               β””β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”˜                                β”‚
β”‚                  β”‚          β”‚                                    β”‚
β”‚           β”Œβ”€β”€β”€β”€β”€β”€β”΄β”€β”€β”  β”Œβ”€β”€β”€β”΄β”€β”€β”€β”€β”€β”€β”€β”€β”                           β”‚
β”‚           β”‚  HUD    β”‚  β”‚ Workers    β”‚                            β”‚
β”‚           β”‚ (notch) β”‚  β”‚ (GPU, etc) β”‚                            β”‚
β”‚           β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜                            β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

Three machine roles:

RoleDescriptionExamples
NodeWorker machine. Registers capabilities, accepts dispatched tasks, returns results.GPU inference box, macOS dev machine, headless Linux server
HubCoordinator. Maintains the machine registry, routes work, manages pipelines.Cloud-hosted (Atlas Center DO), or a local machine acting as mesh coordinator
ChainPipeline definition. Not a machine role but a workflow spec: an ordered sequence of capabilities routed to nodes via the hub.”inference β†’ validate β†’ deploy”, β€œscrape β†’ summarize β†’ notify”

Machine Identity

Every machine in the network has a unique identity β€” a combination of a machine ID, a cryptographic keypair, and a set of metadata that describes what the machine is and what it can do.

interface MachineIdentity {
  /** Unique machine identifier (typically hostname or UUID) */
  machineId: string;

  /** Human-readable label */
  label: string;

  /** Ed25519 public key for this machine */
  publicKey: string;

  /** Machine role in the network */
  role: "node" | "hub" | "hybrid";

  /** When this identity was created */
  createdAt: string;

  /** Network addresses where this machine can be reached */
  addresses: {
    /** Local network address (mDNS or static IP) */
    local?: string;
    /** Tailscale/WireGuard mesh address */
    mesh?: string;
    /** Public address (if directly reachable) */
    public?: string;
  };
}

The machine ID is generated during installation and persisted to ~/.atlas/machine-id. It does not change across restarts, reinstalls, or network changes. The keypair is generated fresh on first install and stored in ~/.atlas/keys/.

Key insight: Machine identity must survive reboots, network changes, and even OS upgrades. Tying identity to hardware (like a MAC address) is fragile. Tying it to a hostname is ambiguous. The safest approach is a generated UUID stored on disk, paired with a keypair that proves ownership.

Capabilities

A capability is a named, typed unit of work that a machine can perform. It has a name, an input schema, an output schema, and metadata about resource requirements.

interface CapabilitySpec {
  /** Unique capability name (e.g., "gpu-inference", "hud-notify") */
  name: string;

  /** Human-readable description */
  description: string;

  /** Zod-compatible JSON schema for the input */
  inputSchema: Record<string, unknown>;

  /** Zod-compatible JSON schema for the output */
  outputSchema?: Record<string, unknown>;

  /** Resource requirements to execute this capability */
  resources?: {
    /** Minimum CPU cores needed */
    minCpuCores?: number;
    /** Minimum memory in GB */
    minMemoryGb?: number;
    /** Requires GPU */
    gpu?: boolean;
    /** Requires display (for HUD, notifications) */
    display?: boolean;
    /** Requires filesystem access */
    filesystem?: boolean;
    /** Requires network access */
    network?: boolean;
    /** Estimated execution time in seconds */
    estimatedDurationSeconds?: number;
  };

  /** Tags for filtering and routing */
  tags?: string[];

  /** Maximum concurrent executions on this machine */
  maxConcurrency?: number;
}

A minimal working example of registering capabilities:

import * as os from "node:os";
import { z } from "zod";

const capabilities: CapabilitySpec[] = [
  {
    name: "hud-notify",
    description: "Write a message to the local HUD status queue",
    inputSchema: {
      type: "object",
      properties: {
        severity: { type: "string", enum: ["green", "yellow", "red"] },
        message: { type: "string" },
      },
      required: ["severity", "message"],
    },
    resources: { display: true },
    maxConcurrency: 10,
  },
  {
    name: "jane-status",
    description: "Return local machine status (CPU, memory, uptime)",
    inputSchema: { type: "object" },
    resources: {},
    maxConcurrency: 50,
  },
];

// Capabilities are declared statically but resource availability
// is checked dynamically at registration time
function getAvailableCapabilities(): CapabilitySpec[] {
  const hasDisplay = os.platform() === "darwin"; // macOS has a display
  return capabilities.filter((cap) => {
    if (cap.resources?.display && !hasDisplay) return false;
    if (cap.resources?.gpu && !detectGpu()) return false;
    return true;
  });
}

Key insight: Capabilities are not the same as β€œwhat software is installed.” A machine might have ollama installed but be out of VRAM. Capabilities must be dynamically validated at registration time and continuously re-evaluated during heartbeats. A capability that was available 60 seconds ago might not be available now.

Registration

Registration is the process by which a machine announces itself to the hub, providing its identity, capabilities, and resource snapshot. The hub stores this in its machine registry and makes it available for capability routing.

interface MachineRegistration {
  /** Machine identity */
  machineId: string;
  label: string;

  /** WebSocket endpoint for dispatching work */
  endpoint: string;

  /** Capabilities this machine offers */
  capabilities: CapabilitySpec[];

  /** Current resource snapshot */
  resources: ResourceSnapshot;

  /** How long this registration is valid (seconds) */
  ttlSeconds: number;

  /** Protocol version */
  protocolVersion: string;
}

interface ResourceSnapshot {
  platform: string;
  arch: string;
  cpus: number;
  memoryGb: number;
  freeMemoryGb: number;
  loadAvg: [number, number, number];
  uptimeSeconds: number;
  gpus?: GpuInfo[];
  diskFreeGb?: number;
}

interface GpuInfo {
  name: string;
  memoryMb: number;
  freeMemoryMb: number;
  utilization: number;
}

Registration is idempotent. Calling register with the same machine ID updates the existing registration rather than creating a duplicate. The TTL ensures that if a machine crashes without deregistering, its registration expires and it is removed from the capability index.

Key insight: Registration must be idempotent and TTL-based. Machines crash. Networks partition. Graceful deregistration (DELETE on shutdown) is best-effort. The TTL is the real garbage collector. Set it to 3x your heartbeat interval β€” if you heartbeat every 60s, set TTL to 180s.

Heartbeat

The heartbeat is a periodic signal from a machine to the hub, confirming liveness and updating resource state. It is lighter than a full registration β€” it refreshes the TTL and updates dynamic fields (free memory, load average, GPU utilization) without resending the full capability list.

interface HeartbeatPayload {
  machineId: string;
  capabilities: string[]; // Just the names, not full specs
  resources: ResourceSnapshot;
  busy: boolean;
  currentTasks?: string[]; // IDs of tasks currently executing
  version: string;
}

// Heartbeat response from the hub
interface HeartbeatResponse {
  /** Whether the hub still recognizes this machine */
  registered: boolean;
  /** If false, machine should re-register */
  reRegister?: boolean;
  /** Pending configuration updates */
  config?: Partial<ConnectorConfig>;
  /** Commands to execute (e.g., "upgrade", "restart") */
  commands?: HubCommand[];
}

The heartbeat response is also a command channel. The hub can use it to tell a machine to upgrade, change its configuration, or even deregister. This avoids the need for a separate push channel for administrative commands (though the WebSocket connection provides one too).


Design Principles

The best install scripts in the ecosystem share common patterns. K3s installs a complete Kubernetes distribution with curl -sfL https://get.k3s.io | sh -. Homebrew bootstraps a package manager. mise and proto install runtime version managers. Tailscale installs a mesh VPN.

What makes them work:

  1. Detect everything, assume nothing. OS, architecture, init system, shell, existing installations.
  2. Single entry point, multiple roles. The same script installs a server or an agent depending on environment variables.
  3. Idempotent. Running the script twice produces the same result. It detects existing installations and skips or upgrades.
  4. Verifiable. Binary checksums are validated before installation.
  5. Service integration. The script configures the installed binary as a system service (launchd, systemd, openrc) so it survives reboots.
  6. Minimal dependencies. Only curl/wget, tar/unzip, and a shell. Everything else is bundled.

K3s Install Script Anatomy

The K3s install script is the gold standard for curl | sh distributed system installers. Its structure:

verify_system()          β†’ detect init system (systemd/openrc)
setup_verify_arch()      β†’ map uname -m to binary suffix
setup_env()              β†’ determine role (server/agent from K3S_URL)
download_and_verify()    β†’ fetch binary + SHA256 checksum, verify
setup_selinux()          β†’ configure SELinux policies
create_env_file()        β†’ write /etc/systemd/system/k3s.env
create_service_file()    β†’ write systemd unit or openrc init script
create_killall()         β†’ write k3s-killall.sh
create_uninstall()       β†’ write k3s-uninstall.sh
systemd_enable_start()   β†’ enable and start the service

Key decisions:

Atlas Connector Install Script

Here is a complete install script for the Atlas connector, following the patterns from k3s and Tailscale:

#!/bin/sh
#
#

set -eu


ATLAS_REPO="garywu/atlas"
ATLAS_API="https://api.github.com/repos/${ATLAS_REPO}/releases"
ATLAS_DEFAULT_HUB="https://api-mom.garywu.dev"
ATLAS_CONFIG_DIR="${ATLAS_CONFIG_DIR:-${HOME}/.atlas}"
ATLAS_INSTALL_DIR="${ATLAS_INSTALL_DIR:-/usr/local/bin}"
ATLAS_LOG_DIR="${ATLAS_CONFIG_DIR}/logs"


RED='\033[0;31m'
GREEN='\033[0;32m'
YELLOW='\033[1;33m'
BLUE='\033[0;34m'
NC='\033[0m' # No Color

info()  { printf "${BLUE}[atlas]${NC} %s\n" "$*"; }
ok()    { printf "${GREEN}[atlas]${NC} %s\n" "$*"; }
warn()  { printf "${YELLOW}[atlas]${NC} %s\n" "$*" >&2; }
fatal() { printf "${RED}[atlas]${NC} %s\n" "$*" >&2; exit 1; }


detect_os() {
  OS="$(uname -s | tr '[:upper:]' '[:lower:]')"
  case "$OS" in
    darwin)  OS="darwin" ;;
    linux)   OS="linux" ;;
    *)       fatal "Unsupported OS: $OS" ;;
  esac
}

detect_arch() {
  ARCH="$(uname -m)"
  case "$ARCH" in
    x86_64|amd64)   ARCH="amd64" ;;
    aarch64|arm64)   ARCH="arm64" ;;
    armv7l|armhf)    ARCH="arm" ;;
    *)               fatal "Unsupported architecture: $ARCH" ;;
  esac
}

detect_init_system() {
  if [ -d /run/systemd/system ]; then
    INIT_SYSTEM="systemd"
  elif command -v launchctl >/dev/null 2>&1; then
    INIT_SYSTEM="launchd"
  elif command -v openrc-run >/dev/null 2>&1; then
    INIT_SYSTEM="openrc"
  else
    INIT_SYSTEM="none"
    warn "No supported init system detected β€” skipping service setup"
  fi
}


check_dependencies() {
  for cmd in curl tar; do
    if ! command -v "$cmd" >/dev/null 2>&1; then
      fatal "Required command not found: $cmd"
    fi
  done

  # Check for Node.js (required for the connector)
  if ! command -v node >/dev/null 2>&1; then
    warn "Node.js not found β€” attempting to install via mise"
    install_node_runtime
  fi

  NODE_VERSION="$(node --version 2>/dev/null || echo 'none')"
  info "Node.js version: $NODE_VERSION"
}

install_node_runtime() {
  if command -v mise >/dev/null 2>&1; then
    info "Installing Node.js via mise..."
    mise install node@lts
    mise use --global node@lts
  elif command -v brew >/dev/null 2>&1; then
    info "Installing Node.js via Homebrew..."
    brew install node
  elif command -v apt-get >/dev/null 2>&1; then
    info "Installing Node.js via apt..."
    curl -fsSL https://deb.nodesource.com/setup_lts.x | sh -
    apt-get install -y nodejs
  else
    fatal "Cannot install Node.js β€” please install it manually"
  fi
}


parse_args() {
  ROLE="${ATLAS_ROLE:-node}"
  HUB_URL="${ATLAS_HUB_URL:-${ATLAS_DEFAULT_HUB}}"
  TOKEN="${ATLAS_TOKEN:-}"
  MACHINE_ID="${ATLAS_MACHINE_ID:-$(hostname)}"
  LABEL=""
  CHANNEL="stable"
  VERSION=""
  SKIP_SERVICE=false
  SKIP_START=false
  UNINSTALL=false

  while [ $# -gt 0 ]; do
    case "$1" in
      --role)        ROLE="$2";       shift 2 ;;
      --hub)         HUB_URL="$2";    shift 2 ;;
      --token)       TOKEN="$2";      shift 2 ;;
      --machine-id)  MACHINE_ID="$2"; shift 2 ;;
      --label)       LABEL="$2";      shift 2 ;;
      --channel)     CHANNEL="$2";    shift 2 ;;
      --version)     VERSION="$2";    shift 2 ;;
      --skip-service) SKIP_SERVICE=true; shift ;;
      --skip-start)  SKIP_START=true; shift ;;
      --uninstall)   UNINSTALL=true;  shift ;;
      *)             fatal "Unknown option: $1" ;;
    esac
  done

  if [ -z "$LABEL" ]; then
    LABEL="$(hostname) (${OS}/${ARCH})"
  fi
}


resolve_version() {
  if [ -n "$VERSION" ]; then
    info "Using pinned version: $VERSION"
    return
  fi

  info "Resolving latest ${CHANNEL} version..."
  VERSION=$(curl -sfL "${ATLAS_API}/latest" | \
    grep '"tag_name"' | \
    sed -E 's/.*"tag_name": *"([^"]+)".*/\1/')

  if [ -z "$VERSION" ]; then
    fatal "Could not determine latest version"
  fi

  info "Latest version: $VERSION"
}


download_binary() {
  BINARY_NAME="atlas-connector-${OS}-${ARCH}"
  DOWNLOAD_URL="https://github.com/${ATLAS_REPO}/releases/download/${VERSION}/${BINARY_NAME}.tar.gz"
  CHECKSUM_URL="${DOWNLOAD_URL}.sha256"

  TMPDIR="$(mktemp -d)"
  trap 'rm -rf "$TMPDIR"' EXIT

  info "Downloading ${BINARY_NAME} ${VERSION}..."
  curl -sfL -o "${TMPDIR}/connector.tar.gz" "$DOWNLOAD_URL" || \
    fatal "Download failed: $DOWNLOAD_URL"

  info "Verifying checksum..."
  curl -sfL -o "${TMPDIR}/connector.sha256" "$CHECKSUM_URL" || \
    warn "Checksum file not available β€” skipping verification"

  if [ -f "${TMPDIR}/connector.sha256" ]; then
    cd "$TMPDIR"
    if command -v sha256sum >/dev/null 2>&1; then
      echo "$(cat connector.sha256)  connector.tar.gz" | sha256sum -c - || \
        fatal "Checksum verification failed"
    elif command -v shasum >/dev/null 2>&1; then
      echo "$(cat connector.sha256)  connector.tar.gz" | shasum -a 256 -c - || \
        fatal "Checksum verification failed"
    fi
    ok "Checksum verified"
  fi

  info "Extracting..."
  tar xzf "${TMPDIR}/connector.tar.gz" -C "$TMPDIR"
}


install_binary() {
  info "Installing to ${ATLAS_INSTALL_DIR}/"

  # Ensure install directory exists
  if [ ! -w "$ATLAS_INSTALL_DIR" ]; then
    if command -v sudo >/dev/null 2>&1; then
      sudo mkdir -p "$ATLAS_INSTALL_DIR"
      sudo cp "${TMPDIR}/atlas-connector" "${ATLAS_INSTALL_DIR}/"
      sudo chmod +x "${ATLAS_INSTALL_DIR}/atlas-connector"
    else
      fatal "Cannot write to ${ATLAS_INSTALL_DIR} β€” run as root or set ATLAS_INSTALL_DIR"
    fi
  else
    mkdir -p "$ATLAS_INSTALL_DIR"
    cp "${TMPDIR}/atlas-connector" "${ATLAS_INSTALL_DIR}/"
    chmod +x "${ATLAS_INSTALL_DIR}/atlas-connector"
  fi

  ok "Installed atlas-connector to ${ATLAS_INSTALL_DIR}/atlas-connector"
}


setup_config() {
  mkdir -p "$ATLAS_CONFIG_DIR"
  mkdir -p "$ATLAS_LOG_DIR"
  mkdir -p "${ATLAS_CONFIG_DIR}/keys"

  # Generate machine ID file if not present
  MACHINE_ID_FILE="${ATLAS_CONFIG_DIR}/machine-id"
  if [ ! -f "$MACHINE_ID_FILE" ]; then
    if command -v uuidgen >/dev/null 2>&1; then
      uuidgen | tr '[:upper:]' '[:lower:]' > "$MACHINE_ID_FILE"
    else
      cat /proc/sys/kernel/random/uuid > "$MACHINE_ID_FILE" 2>/dev/null || \
        head -c 16 /dev/urandom | od -An -tx1 | tr -d ' \n' | \
        sed 's/\(.\{8\}\)\(.\{4\}\)\(.\{4\}\)\(.\{4\}\)/\1-\2-\3-\4-/' \
        > "$MACHINE_ID_FILE"
    fi
    info "Generated machine ID: $(cat "$MACHINE_ID_FILE")"
  else
    info "Existing machine ID: $(cat "$MACHINE_ID_FILE")"
  fi

  # Generate Ed25519 keypair if not present
  KEY_FILE="${ATLAS_CONFIG_DIR}/keys/machine.key"
  PUB_FILE="${ATLAS_CONFIG_DIR}/keys/machine.pub"
  if [ ! -f "$KEY_FILE" ]; then
    info "Generating Ed25519 keypair..."
    if command -v ssh-keygen >/dev/null 2>&1; then
      ssh-keygen -t ed25519 -f "$KEY_FILE" -N "" -C "atlas-connector@$(hostname)" \
        >/dev/null 2>&1
      ok "Keypair generated"
    elif command -v openssl >/dev/null 2>&1; then
      openssl genpkey -algorithm Ed25519 -out "$KEY_FILE" 2>/dev/null
      openssl pkey -in "$KEY_FILE" -pubout -out "$PUB_FILE" 2>/dev/null
      ok "Keypair generated (OpenSSL)"
    else
      warn "No key generation tool available β€” skipping keypair generation"
    fi
    chmod 600 "$KEY_FILE" 2>/dev/null || true
  fi

  # Write config file
  CONFIG_FILE="${ATLAS_CONFIG_DIR}/connector.json"
  cat > "$CONFIG_FILE" <<EOF
{
  "machineId": "$(cat "$MACHINE_ID_FILE")",
  "label": "${LABEL}",
  "role": "${ROLE}",
  "hubUrl": "${HUB_URL}",
  "protocolVersion": "1.0",
  "heartbeatIntervalMs": 60000,
  "registrationTtlSeconds": 180,
  "capabilities": {
    "autoDetect": true,
    "additional": []
  }
}
EOF

  ok "Configuration written to $CONFIG_FILE"
}


setup_service() {
  if [ "$SKIP_SERVICE" = true ]; then
    info "Skipping service setup (--skip-service)"
    return
  fi

  case "$INIT_SYSTEM" in
    launchd)  setup_launchd ;;
    systemd)  setup_systemd ;;
    openrc)   setup_openrc ;;
    *)        warn "Cannot configure service β€” no supported init system" ;;
  esac
}

setup_launchd() {
  PLIST_PATH="${HOME}/Library/LaunchAgents/com.atlas.connector.plist"
  info "Writing launchd plist: $PLIST_PATH"

  cat > "$PLIST_PATH" <<EOF
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE plist PUBLIC "-//Apple//DTD PLIST 1.0//EN"
  "http://www.apple.com/DTDs/PropertyList-1.0.dtd">
<plist version="1.0">
<dict>
    <key>Label</key>
    <string>com.atlas.connector</string>

    <key>ProgramArguments</key>
    <array>
        <string>${ATLAS_INSTALL_DIR}/atlas-connector</string>
    </array>

    <key>WorkingDirectory</key>
    <string>${ATLAS_CONFIG_DIR}</string>

    <key>RunAtLoad</key>
    <true/>

    <key>KeepAlive</key>
    <dict>
        <key>SuccessfulExit</key>
        <false/>
    </dict>

    <key>ThrottleInterval</key>
    <integer>10</integer>

    <key>StandardOutPath</key>
    <string>${ATLAS_LOG_DIR}/connector.log</string>

    <key>StandardErrorPath</key>
    <string>${ATLAS_LOG_DIR}/connector.err.log</string>

    <key>EnvironmentVariables</key>
    <dict>
        <key>PATH</key>
        <string>/opt/homebrew/bin:/usr/local/bin:/usr/bin:/bin</string>
        <key>ATLAS_CONFIG_DIR</key>
        <string>${ATLAS_CONFIG_DIR}</string>
        <key>ATLAS_TOKEN</key>
        <string>${TOKEN}</string>
    </dict>
</dict>
</plist>
EOF

  ok "launchd service configured"
}

setup_systemd() {
  UNIT_FILE="/etc/systemd/system/atlas-connector.service"
  info "Writing systemd unit: $UNIT_FILE"

  sudo tee "$UNIT_FILE" >/dev/null <<EOF
[Unit]
Description=Atlas Connector
After=network-online.target
Wants=network-online.target

[Service]
Type=simple
ExecStart=${ATLAS_INSTALL_DIR}/atlas-connector
WorkingDirectory=${ATLAS_CONFIG_DIR}
Restart=on-failure
RestartSec=10
StandardOutput=journal
StandardError=journal
Environment=ATLAS_CONFIG_DIR=${ATLAS_CONFIG_DIR}
Environment=ATLAS_TOKEN=${TOKEN}
Environment=PATH=/usr/local/bin:/usr/bin:/bin

NoNewPrivileges=true
ProtectSystem=strict
ProtectHome=read-only
ReadWritePaths=${ATLAS_CONFIG_DIR}
PrivateTmp=true

[Install]
WantedBy=multi-user.target
EOF

  sudo systemctl daemon-reload
  sudo systemctl enable atlas-connector
  ok "systemd service configured and enabled"
}

setup_openrc() {
  INIT_SCRIPT="/etc/init.d/atlas-connector"
  info "Writing OpenRC init script: $INIT_SCRIPT"

  sudo tee "$INIT_SCRIPT" >/dev/null <<'INITEOF'
#!/sbin/openrc-run
description="Atlas Connector"
command="${ATLAS_INSTALL_DIR}/atlas-connector"
command_background=true
pidfile="/var/run/atlas-connector.pid"
output_log="${ATLAS_LOG_DIR}/connector.log"
error_log="${ATLAS_LOG_DIR}/connector.err.log"

depend() {
    need net
    after firewall
}
INITEOF

  sudo chmod +x "$INIT_SCRIPT"
  sudo rc-update add atlas-connector default
  ok "OpenRC service configured"
}


register_with_hub() {
  if [ -z "$TOKEN" ]; then
    warn "No token provided β€” skipping initial registration"
    warn "Set ATLAS_TOKEN or use --token to register with the hub"
    return
  fi

  info "Registering with hub at ${HUB_URL}..."

  RESPONSE=$(curl -sf -X POST \
    -H "Content-Type: application/json" \
    -H "Authorization: Bearer ${TOKEN}" \
    -d "{
      \"machine_id\": \"$(cat "${ATLAS_CONFIG_DIR}/machine-id")\",
      \"label\": \"${LABEL}\",
      \"role\": \"${ROLE}\",
      \"resources\": {
        \"platform\": \"${OS}\",
        \"arch\": \"${ARCH}\",
        \"cpus\": $(nproc 2>/dev/null || sysctl -n hw.ncpu 2>/dev/null || echo 1)
      },
      \"ttl_seconds\": 180
    }" \
    "${HUB_URL}/v1/machines/register" 2>&1) || {
    warn "Registration failed β€” connector will retry on startup"
    return
  }

  ok "Registered with hub"
}


start_service() {
  if [ "$SKIP_START" = true ]; then
    info "Skipping service start (--skip-start)"
    return
  fi

  case "$INIT_SYSTEM" in
    launchd)
      launchctl load "$PLIST_PATH" 2>/dev/null || true
      launchctl start com.atlas.connector 2>/dev/null || true
      ok "Service started via launchd"
      ;;
    systemd)
      sudo systemctl start atlas-connector
      ok "Service started via systemd"
      ;;
    openrc)
      sudo rc-service atlas-connector start
      ok "Service started via OpenRC"
      ;;
    *)
      info "Start the connector manually:"
      info "  atlas-connector"
      ;;
  esac
}


do_uninstall() {
  info "Uninstalling Atlas connector..."

  # Stop service
  case "$INIT_SYSTEM" in
    launchd)
      launchctl unload "${HOME}/Library/LaunchAgents/com.atlas.connector.plist" 2>/dev/null || true
      rm -f "${HOME}/Library/LaunchAgents/com.atlas.connector.plist"
      ;;
    systemd)
      sudo systemctl stop atlas-connector 2>/dev/null || true
      sudo systemctl disable atlas-connector 2>/dev/null || true
      sudo rm -f /etc/systemd/system/atlas-connector.service
      sudo systemctl daemon-reload
      ;;
    openrc)
      sudo rc-service atlas-connector stop 2>/dev/null || true
      sudo rc-update del atlas-connector 2>/dev/null || true
      sudo rm -f /etc/init.d/atlas-connector
      ;;
  esac

  # Deregister from hub
  if [ -f "${ATLAS_CONFIG_DIR}/machine-id" ] && [ -n "${TOKEN:-}" ]; then
    MID="$(cat "${ATLAS_CONFIG_DIR}/machine-id")"
    curl -sf -X DELETE \
      -H "Authorization: Bearer ${TOKEN}" \
      "${HUB_URL}/v1/machines/${MID}" 2>/dev/null || true
    info "Deregistered from hub"
  fi

  # Remove binary
  rm -f "${ATLAS_INSTALL_DIR}/atlas-connector"

  # Ask about config
  printf "Remove configuration directory %s? [y/N] " "$ATLAS_CONFIG_DIR"
  read -r answer
  if [ "$answer" = "y" ] || [ "$answer" = "Y" ]; then
    rm -rf "$ATLAS_CONFIG_DIR"
    ok "Configuration removed"
  else
    info "Configuration preserved at $ATLAS_CONFIG_DIR"
  fi

  ok "Atlas connector uninstalled"
  exit 0
}


main() {
  info "Atlas Connector Installer"
  info "========================="

  detect_os
  detect_arch
  detect_init_system
  parse_args "$@"

  info "OS: ${OS}  Arch: ${ARCH}  Init: ${INIT_SYSTEM}  Role: ${ROLE}"

  if [ "$UNINSTALL" = true ]; then
    do_uninstall
  fi

  check_dependencies
  resolve_version
  download_binary
  install_binary
  setup_config
  setup_service
  register_with_hub
  start_service

  ok "========================================="
  ok "Atlas Connector installed successfully!"
  ok "========================================="
  ok ""
  ok "  Machine ID:  $(cat "${ATLAS_CONFIG_DIR}/machine-id")"
  ok "  Role:        ${ROLE}"
  ok "  Hub:         ${HUB_URL}"
  ok "  Config:      ${ATLAS_CONFIG_DIR}/connector.json"
  ok "  Logs:        ${ATLAS_LOG_DIR}/connector.log"
  ok ""
  ok "Manage the service:"
  case "$INIT_SYSTEM" in
    launchd)
      ok "  launchctl start com.atlas.connector"
      ok "  launchctl stop com.atlas.connector"
      ok "  tail -f ${ATLAS_LOG_DIR}/connector.log"
      ;;
    systemd)
      ok "  sudo systemctl status atlas-connector"
      ok "  sudo journalctl -u atlas-connector -f"
      ;;
    openrc)
      ok "  sudo rc-service atlas-connector status"
      ok "  tail -f ${ATLAS_LOG_DIR}/connector.log"
      ;;
  esac
}

main "$@"

What this script does differently from most installers

  1. Generates a persistent machine identity. Not just installing a binary β€” creating a cryptographic identity that will represent this machine in the network.

  2. Auto-detects and installs Node.js via whichever package manager is available (mise, Homebrew, apt).

  3. Configures the init system automatically. Detects launchd (macOS), systemd (Linux), or OpenRC and writes the appropriate service configuration with security hardening.

  4. Registers with the hub during installation. The machine is immediately discoverable by other nodes in the network.

  5. Includes a complete uninstaller that deregisters from the hub, stops the service, removes the binary, and optionally cleans up configuration.

Key insight: The best install scripts do not just copy a binary. They establish identity, configure supervision, and integrate the new component into an existing system. The install script is the onboarding protocol.


Protocol Messages

The connector communicates with the hub using a typed message protocol over WebSocket. Here is the complete type system:

// ── Messages from Hub β†’ Machine (Server Messages) ──────────────────────────

type ServerMessage =
  | DispatchRequest
  | PingMessage
  | PongMessage
  | UpdateCommand
  | ConfigPush
  | CapabilityQuery;

interface DispatchRequest {
  type: "request";
  /** Unique request ID for correlation */
  id: string;
  /** Capability to invoke */
  capability: string;
  /** Input payload for the capability handler */
  input: unknown;
  /** Optional: which pipeline this request belongs to */
  pipelineId?: string;
  /** Optional: step index within the pipeline */
  stepIndex?: number;
  /** Timeout in milliseconds */
  timeoutMs?: number;
}

interface PingMessage {
  type: "ping";
  timestamp: number;
}

interface PongMessage {
  type: "pong";
  timestamp: number;
}

interface UpdateCommand {
  type: "update";
  /** Target version to upgrade to */
  version?: string;
  /** URL to download the update from */
  url?: string;
  /** Whether to restart after update */
  restart?: boolean;
}

interface ConfigPush {
  type: "config-push";
  /** Partial config to merge */
  config: Partial<{
    heartbeatIntervalMs: number;
    capabilities: string[];
    logLevel: "debug" | "info" | "warn" | "error";
  }>;
}

interface CapabilityQuery {
  type: "capability-query";
  /** Request ID */
  id: string;
  /** Capability name to check */
  capability: string;
}

// ── Messages from Machine β†’ Hub (Client Messages) ──────────────────────────

type ClientMessage =
  | ResponseMessage
  | StatusReport
  | PingMessage
  | PongMessage
  | CapabilityAdvertisement
  | TaskProgress;

interface ResponseMessage {
  type: "response";
  /** Correlates with the request ID */
  id: string;
  /** HTTP-style status code */
  status: number;
  /** Result payload */
  body: unknown;
}

interface StatusReport {
  type: "status-report";
  /** Machine ID */
  machine: string;
  /** Whether the machine is currently processing a task */
  busy: boolean;
  /** Connector version */
  version: string;
  /** Current resource snapshot */
  resources?: ResourceSnapshot;
  /** IDs of tasks currently executing */
  activeTasks?: string[];
}

interface CapabilityAdvertisement {
  type: "capability-advertisement";
  /** Full capability specs (sent on connect and when capabilities change) */
  capabilities: CapabilitySpec[];
}

interface TaskProgress {
  type: "task-progress";
  /** Request ID this progress is for */
  id: string;
  /** Progress percentage (0-100) */
  progress: number;
  /** Human-readable status message */
  message?: string;
  /** Partial result (for streaming) */
  partial?: unknown;
}

Registration Flow

The full registration flow has six phases:

Machine                                Hub (Atlas Center DO)
  β”‚                                        β”‚
  β”‚  1. POST /v1/machines/register         β”‚
  β”‚  ──────────────────────────────────►   β”‚
  β”‚  { machineId, capabilities,            β”‚
  β”‚    resources, ttlSeconds }             β”‚
  β”‚                                        β”‚  Store in machine registry
  β”‚  ◄──────────────────────────────────   β”‚  Update capability index
  β”‚  { registered: true, hubWsUrl }        β”‚
  β”‚                                        β”‚
  β”‚  2. WebSocket connect                  β”‚
  β”‚  ══════════════════════════════════►   β”‚
  β”‚  Authorization: Bearer <token>         β”‚
  β”‚                                        β”‚
  β”‚  3. status-report (initial)            β”‚
  β”‚  ──────────────────────────────────►   β”‚
  β”‚  { machine, busy: false, version }     β”‚  Mark machine as "connected"
  β”‚                                        β”‚
  β”‚  4. capability-advertisement           β”‚
  β”‚  ──────────────────────────────────►   β”‚
  β”‚  { capabilities: [...full specs] }     β”‚  Merge into capability index
  β”‚                                        β”‚
  β”‚  5. heartbeat loop (every 60s)         β”‚
  β”‚  ──────────────────────────────────►   β”‚
  β”‚  POST /v1/machines/heartbeat           β”‚  Refresh TTL
  β”‚  { machineId, resources }              β”‚  Update resource snapshot
  β”‚  ◄──────────────────────────────────   β”‚
  β”‚  { registered: true, commands: [] }    β”‚
  β”‚                                        β”‚
  β”‚  6. dispatch (when work arrives)       β”‚
  β”‚  ◄══════════════════════════════════   β”‚
  β”‚  { type: "request", capability,        β”‚
  β”‚    input, id }                         β”‚
  β”‚  ══════════════════════════════════►   β”‚
  β”‚  { type: "response", id, status,       β”‚
  β”‚    body }                              β”‚
  β”‚                                        β”‚

Hub-Side Registry (Durable Object)

The hub maintains a machine registry as a Durable Object. Here is the core data structure and routing logic:

interface MachineRegistry {
  /** All registered machines, keyed by machine ID */
  machines: Map<string, RegisteredMachine>;

  /** Capability index: capability name β†’ set of machine IDs */
  capabilityIndex: Map<string, Set<string>>;

  /** Active WebSocket connections, keyed by machine ID */
  connections: Map<string, WebSocket>;
}

interface RegisteredMachine {
  machineId: string;
  label: string;
  role: "node" | "hub" | "hybrid";
  capabilities: CapabilitySpec[];
  resources: ResourceSnapshot;
  registeredAt: number;
  lastHeartbeat: number;
  ttlSeconds: number;
  busy: boolean;
  activeTasks: string[];
  version: string;
  connected: boolean;
}

class AtlasCenterDO {
  private registry: MachineRegistry = {
    machines: new Map(),
    capabilityIndex: new Map(),
    connections: new Map(),
  };

  /**
   * Find machines that can handle a given capability.
   * Returns machines sorted by suitability (least busy, most resources).
   */
  findMachinesForCapability(
    capabilityName: string,
    requirements?: Partial<CapabilitySpec["resources"]>
  ): RegisteredMachine[] {
    const machineIds = this.registry.capabilityIndex.get(capabilityName);
    if (!machineIds) return [];

    const now = Date.now();
    const candidates: RegisteredMachine[] = [];

    for (const id of machineIds) {
      const machine = this.registry.machines.get(id);
      if (!machine) continue;

      // Check TTL β€” skip expired registrations
      const expiresAt = machine.lastHeartbeat + machine.ttlSeconds * 1000;
      if (now > expiresAt) {
        this.deregisterMachine(id);
        continue;
      }

      // Check if machine is connected
      if (!machine.connected) continue;

      // Check resource requirements
      if (requirements) {
        if (requirements.gpu && !machine.resources.gpus?.length) continue;
        if (
          requirements.minMemoryGb &&
          machine.resources.freeMemoryGb < requirements.minMemoryGb
        )
          continue;
        if (
          requirements.minCpuCores &&
          machine.resources.cpus < requirements.minCpuCores
        )
          continue;
      }

      // Check concurrency limits
      const capSpec = machine.capabilities.find(
        (c) => c.name === capabilityName
      );
      if (
        capSpec?.maxConcurrency &&
        machine.activeTasks.length >= capSpec.maxConcurrency
      )
        continue;

      candidates.push(machine);
    }

    // Sort: prefer not-busy, then by free memory, then by load
    return candidates.sort((a, b) => {
      if (a.busy !== b.busy) return a.busy ? 1 : -1;
      const aLoad = a.resources.loadAvg?.[0] ?? 0;
      const bLoad = b.resources.loadAvg?.[0] ?? 0;
      return aLoad - bLoad;
    });
  }

  /**
   * Dispatch a request to the best available machine for a capability.
   */
  async dispatchToCapability(
    capabilityName: string,
    input: unknown,
    options?: { timeoutMs?: number; pipelineId?: string; stepIndex?: number }
  ): Promise<{ machineId: string; result: unknown }> {
    const candidates = this.findMachinesForCapability(capabilityName);

    if (candidates.length === 0) {
      throw new Error(
        `No machines available for capability "${capabilityName}"`
      );
    }

    const target = candidates[0];
    const ws = this.registry.connections.get(target.machineId);

    if (!ws) {
      throw new Error(
        `Machine "${target.machineId}" is registered but not connected`
      );
    }

    const requestId = crypto.randomUUID();

    return new Promise((resolve, reject) => {
      const timeout = setTimeout(() => {
        reject(
          new Error(
            `Timeout waiting for response from "${target.machineId}" ` +
              `(capability: ${capabilityName})`
          )
        );
      }, options?.timeoutMs ?? 30_000);

      // Store the resolver so the WebSocket message handler can call it
      this.pendingRequests.set(requestId, {
        resolve: (result: unknown) => {
          clearTimeout(timeout);
          resolve({ machineId: target.machineId, result });
        },
        reject: (error: Error) => {
          clearTimeout(timeout);
          reject(error);
        },
      });

      const message: DispatchRequest = {
        type: "request",
        id: requestId,
        capability: capabilityName,
        input,
        pipelineId: options?.pipelineId,
        stepIndex: options?.stepIndex,
        timeoutMs: options?.timeoutMs,
      };

      ws.send(JSON.stringify(message));
    });
  }

  private pendingRequests = new Map<
    string,
    {
      resolve: (result: unknown) => void;
      reject: (error: Error) => void;
    }
  >();

  /**
   * Remove a machine from the registry and update the capability index.
   */
  private deregisterMachine(machineId: string): void {
    const machine = this.registry.machines.get(machineId);
    if (!machine) return;

    // Remove from capability index
    for (const cap of machine.capabilities) {
      const machines = this.registry.capabilityIndex.get(cap.name);
      if (machines) {
        machines.delete(machineId);
        if (machines.size === 0) {
          this.registry.capabilityIndex.delete(cap.name);
        }
      }
    }

    // Close WebSocket if still open
    const ws = this.registry.connections.get(machineId);
    if (ws) {
      try {
        ws.close(1000, "deregistered");
      } catch {
        // ignore
      }
      this.registry.connections.delete(machineId);
    }

    this.registry.machines.delete(machineId);
  }
}

Connector-Side Handler

Here is how the connector processes incoming dispatch requests. This is a production-quality implementation based on the real Atlas connector:

import * as os from "node:os";
import WebSocket from "ws";
import { z } from "zod";

// ── Capability Registry ─────────────────────────────────────────────────────

type CapabilityResult = {
  success: boolean;
  data?: unknown;
  error?: string;
};

type CapabilityHandler = (input: unknown) => Promise<CapabilityResult>;

const CAPABILITY_HANDLERS: Map<string, CapabilityHandler> = new Map();

/**
 * Register a capability handler. Called at startup.
 */
function registerCapability(
  name: string,
  handler: CapabilityHandler
): void {
  CAPABILITY_HANDLERS.set(name, handler);
}

// ── Built-in Capabilities ───────────────────────────────────────────────────

registerCapability("jane-status", async () => ({
  success: true,
  data: {
    hostname: os.hostname(),
    platform: os.platform(),
    arch: os.arch(),
    uptimeSeconds: os.uptime(),
    cpu: {
      model: os.cpus()[0]?.model ?? "unknown",
      cores: os.cpus().length,
      loadAvg: os.loadavg(),
    },
    memory: {
      totalGb: +(os.totalmem() / 1e9).toFixed(1),
      freeGb: +(os.freemem() / 1e9).toFixed(1),
      usedPct: +(
        ((os.totalmem() - os.freemem()) / os.totalmem()) *
        100
      ).toFixed(1),
    },
  },
}));

registerCapability("hud-notify", async (input) => {
  const schema = z.object({
    severity: z.enum(["green", "yellow", "red"]),
    message: z.string(),
    source: z.string().default("connector"),
  });

  const parsed = schema.safeParse(input);
  if (!parsed.success) {
    return { success: false, error: parsed.error.message };
  }

  // Write to the HUD status queue file
  const fs = await import("node:fs/promises");
  const path = await import("node:path");
  const queuePath = path.join(os.homedir(), ".atlas", "status-queue.json");

  const msg = {
    id: crypto.randomUUID(),
    ...parsed.data,
    created: new Date().toISOString(),
  };

  let queue = { messages: [] as unknown[] };
  try {
    const raw = await fs.readFile(queuePath, "utf-8");
    queue = JSON.parse(raw);
  } catch {
    // File doesn't exist β€” start fresh
  }

  queue.messages.push(msg);
  await fs.writeFile(queuePath, JSON.stringify(queue, null, 2));

  return { success: true, data: { id: msg.id } };
});

// ── Request Handler ─────────────────────────────────────────────────────────

async function handleDispatch(
  ws: WebSocket,
  request: DispatchRequest,
  machineId: string
): Promise<void> {
  const handler = CAPABILITY_HANDLERS.get(request.capability);

  if (!handler) {
    ws.send(
      JSON.stringify({
        type: "response",
        id: request.id,
        status: 404,
        body: {
          error: "unknown_capability",
          message: `Capability "${request.capability}" not registered`,
          available: Array.from(CAPABILITY_HANDLERS.keys()),
        },
      })
    );
    return;
  }

  // Report busy
  ws.send(
    JSON.stringify({
      type: "status-report",
      machine: machineId,
      busy: true,
      version: "0.1.0",
    })
  );

  let result: CapabilityResult;
  const startTime = Date.now();

  try {
    // Apply timeout if specified
    if (request.timeoutMs) {
      result = await Promise.race([
        handler(request.input),
        new Promise<CapabilityResult>((_, reject) =>
          setTimeout(
            () => reject(new Error("Capability execution timed out")),
            request.timeoutMs!
          )
        ),
      ]);
    } else {
      result = await handler(request.input);
    }
  } catch (err) {
    result = {
      success: false,
      error: `Handler threw: ${err instanceof Error ? err.message : String(err)}`,
    };
  }

  const durationMs = Date.now() - startTime;

  // Send response
  ws.send(
    JSON.stringify({
      type: "response",
      id: request.id,
      status: result.success ? 200 : 500,
      body: { ...result, durationMs },
    })
  );

  // Report not busy
  ws.send(
    JSON.stringify({
      type: "status-report",
      machine: machineId,
      busy: false,
      version: "0.1.0",
    })
  );
}

How Systems Discover What Machines Can Do

Capability discovery is the process of building and maintaining an index of β€œwhat can be done and where.” Different systems take fundamentally different approaches.

Pattern 1: Central Registry (Consul, etcd)

Consul and etcd use a central key-value store as the source of truth. Machines register themselves, and consumers query the registry.

// Consul-style service registration
interface ConsulServiceRegistration {
  ID: string;
  Name: string;
  Tags: string[];
  Port: number;
  Meta: Record<string, string>;
  Check: {
    HTTP?: string;
    TCP?: string;
    Interval: string;
    Timeout: string;
    DeregisterCriticalServiceAfter: string;
  };
}

// Register a service with Consul
async function registerWithConsul(
  consulAddr: string,
  service: ConsulServiceRegistration
): Promise<void> {
  await fetch(`${consulAddr}/v1/agent/service/register`, {
    method: "PUT",
    headers: { "Content-Type": "application/json" },
    body: JSON.stringify(service),
  });
}

// Query capabilities from Consul
async function findCapableServices(
  consulAddr: string,
  capability: string
): Promise<ConsulServiceRegistration[]> {
  const res = await fetch(
    `${consulAddr}/v1/catalog/service/atlas-connector?tag=${capability}`
  );
  return res.json();
}

Trade-offs:

Pattern 2: Gossip Protocol (Consul Serf, Swim)

Consul itself uses a gossip protocol (Serf) for cluster membership and failure detection. Nodes communicate directly with each other, spreading membership information epidemically.

// Gossip-style peer discovery
interface GossipPeer {
  machineId: string;
  address: string;
  capabilities: string[];
  generation: number; // Lamport clock for conflict resolution
  status: "alive" | "suspect" | "dead";
  lastSeen: number;
}

class GossipMembership {
  private peers: Map<string, GossipPeer> = new Map();
  private self: GossipPeer;
  private fanout = 3; // Number of peers to gossip to per round

  constructor(selfPeer: GossipPeer) {
    this.self = selfPeer;
    this.peers.set(selfPeer.machineId, selfPeer);
  }

  /**
   * Gossip round: pick random peers and exchange membership lists.
   * Each round, every node talks to `fanout` other nodes.
   * Information propagates in O(log N) rounds.
   */
  async gossipRound(): Promise<void> {
    const alivePeers = Array.from(this.peers.values()).filter(
      (p) => p.status === "alive" && p.machineId !== this.self.machineId
    );

    // Pick random peers to gossip with
    const targets = this.pickRandom(alivePeers, this.fanout);

    for (const target of targets) {
      try {
        // Send our membership list, receive theirs
        const response = await this.sendGossip(target.address, {
          sender: this.self,
          members: Array.from(this.peers.values()),
        });

        // Merge received membership info
        for (const remotePeer of response.members) {
          this.mergePeer(remotePeer);
        }
      } catch {
        // Mark peer as suspect
        target.status = "suspect";
      }
    }
  }

  private mergePeer(remote: GossipPeer): void {
    const local = this.peers.get(remote.machineId);
    if (!local || remote.generation > local.generation) {
      this.peers.set(remote.machineId, remote);
    }
  }

  private pickRandom<T>(arr: T[], n: number): T[] {
    const shuffled = [...arr].sort(() => Math.random() - 0.5);
    return shuffled.slice(0, n);
  }

  private async sendGossip(
    address: string,
    payload: { sender: GossipPeer; members: GossipPeer[] }
  ): Promise<{ members: GossipPeer[] }> {
    const res = await fetch(`${address}/gossip`, {
      method: "POST",
      body: JSON.stringify(payload),
    });
    return res.json();
  }
}

Pattern 3: mDNS / Bonjour (Zero-Configuration Discovery)

For local network discovery, mDNS (multicast DNS) and DNS-SD (DNS Service Discovery) let machines find each other without any central registry. On macOS, this is Bonjour. On Linux, it is Avahi.

import { Bonjour } from "bonjour-service";

const bonjour = new Bonjour();

// Advertise this machine's capabilities on the local network
function advertiseCapabilities(
  machineId: string,
  capabilities: string[],
  port: number
): void {
  bonjour.publish({
    name: `atlas-connector-${machineId}`,
    type: "atlas-connector",
    protocol: "tcp",
    port,
    txt: {
      machineId,
      capabilities: capabilities.join(","),
      version: "0.1.0",
      role: "node",
    },
  });
}

// Discover other Atlas connectors on the local network
function discoverPeers(
  onFound: (peer: {
    machineId: string;
    host: string;
    port: number;
    capabilities: string[];
  }) => void
): void {
  bonjour.find({ type: "atlas-connector" }, (service) => {
    const txt = service.txt as Record<string, string>;
    onFound({
      machineId: txt.machineId ?? service.name,
      host: service.host,
      port: service.port,
      capabilities: (txt.capabilities ?? "").split(",").filter(Boolean),
    });
  });
}

// Usage
advertiseCapabilities("jane-macbook", ["hud-notify", "jane-status"], 7654);

discoverPeers((peer) => {
  console.log(
    `Found peer: ${peer.machineId} at ${peer.host}:${peer.port}`,
    `capabilities: ${peer.capabilities.join(", ")}`
  );
});

Key insight: mDNS is perfect for LAN-only discovery but does not cross network boundaries. For a system like Atlas that spans local machines and cloud, you need both: mDNS for discovering peers on the same network, and a central registry (the hub) for cross-network coordination. Peers discovered via mDNS should still register with the hub.

Pattern 4: Kubernetes Labels and Taints

Kubernetes uses node labels, taints, and tolerations for capability-based scheduling. Labels describe what a node has. Taints describe what a node rejects.

// Kubernetes-inspired label-based capability matching
interface MachineLabels {
  // Standard labels
  "atlas.dev/platform": string;      // "darwin", "linux"
  "atlas.dev/arch": string;          // "arm64", "amd64"
  "atlas.dev/role": string;          // "node", "hub"

  // Capability labels
  "atlas.dev/gpu": string;           // "true", "false"
  "atlas.dev/gpu-model"?: string;    // "m2-ultra", "rtx-4090"
  "atlas.dev/gpu-vram"?: string;     // "64", "24" (GB)
  "atlas.dev/display": string;       // "true", "false"
  "atlas.dev/inference": string;     // "ollama", "vllm", "none"

  // Custom labels
  [key: string]: string | undefined;
}

// Taints: machine rejects work unless the request tolerates it
interface MachineTaint {
  key: string;
  value: string;
  effect: "NoSchedule" | "PreferNoSchedule" | "NoExecute";
}

// Selector: request specifies what it needs
interface CapabilitySelector {
  /** Required labels (AND logic) */
  matchLabels?: Record<string, string>;
  /** Label expressions (more flexible matching) */
  matchExpressions?: {
    key: string;
    operator: "In" | "NotIn" | "Exists" | "DoesNotExist" | "Gt" | "Lt";
    values?: string[];
  }[];
  /** Tolerate specific taints */
  tolerations?: {
    key: string;
    operator: "Equal" | "Exists";
    value?: string;
    effect?: MachineTaint["effect"];
  }[];
}

// Example: find a machine with GPU and display for inference + visualization
const inferenceSelector: CapabilitySelector = {
  matchLabels: {
    "atlas.dev/gpu": "true",
    "atlas.dev/display": "true",
  },
  matchExpressions: [
    {
      key: "atlas.dev/gpu-vram",
      operator: "Gt",
      values: ["16"], // At least 16GB VRAM
    },
    {
      key: "atlas.dev/inference",
      operator: "In",
      values: ["ollama", "vllm"],
    },
  ],
};

function matchesMachine(
  machine: { labels: MachineLabels; taints?: MachineTaint[] },
  selector: CapabilitySelector
): boolean {
  // Check matchLabels
  if (selector.matchLabels) {
    for (const [key, value] of Object.entries(selector.matchLabels)) {
      if ((machine.labels as Record<string, string>)[key] !== value) {
        return false;
      }
    }
  }

  // Check matchExpressions
  if (selector.matchExpressions) {
    for (const expr of selector.matchExpressions) {
      const machineValue = (machine.labels as Record<string, string>)[
        expr.key
      ];
      switch (expr.operator) {
        case "In":
          if (!expr.values?.includes(machineValue ?? "")) return false;
          break;
        case "NotIn":
          if (expr.values?.includes(machineValue ?? "")) return false;
          break;
        case "Exists":
          if (machineValue === undefined) return false;
          break;
        case "DoesNotExist":
          if (machineValue !== undefined) return false;
          break;
        case "Gt":
          if (Number(machineValue) <= Number(expr.values?.[0])) return false;
          break;
        case "Lt":
          if (Number(machineValue) >= Number(expr.values?.[0])) return false;
          break;
      }
    }
  }

  // Check taints vs tolerations
  if (machine.taints) {
    for (const taint of machine.taints) {
      if (taint.effect === "NoSchedule" || taint.effect === "NoExecute") {
        const tolerated = selector.tolerations?.some(
          (t) =>
            t.key === taint.key &&
            (t.operator === "Exists" || t.value === taint.value)
        );
        if (!tolerated) return false;
      }
    }
  }

  return true;
}

Comparison: Discovery Approaches

ApproachScopeLatencyDependenciesBest For
Central Registry (Consul/etcd)GlobalLow (single query)Registry server must be availableCloud-native services, cross-DC
Gossip Protocol (Serf/SWIM)ClusterO(log N) roundsNone (peer-to-peer)Large clusters, failure detection
mDNS/BonjourLAN onlySub-secondNone (multicast)Local network, zero-config
K8s LabelsClusterLow (API server query)Kubernetes API serverContainer workloads
TailscaleGlobal meshLowTailscale coordination serverSecure cross-network connectivity
Hub-spoke (Atlas pattern)GlobalLow (hub query)Hub must be availableAI agent platforms, mixed networks

For Atlas, the answer is a hybrid: mDNS for LAN peer discovery + cloud hub for global coordination. Machines on the same network find each other instantly via mDNS. All machines register with the cloud hub for cross-network routing and pipeline orchestration.


Node: The Worker

A node is a machine that registers capabilities and executes dispatched work. It does not make routing decisions β€” it receives requests and returns results.

class AtlasNode {
  private config: ConnectorConfig;
  private ws: WebSocket | null = null;
  private capabilities: Map<string, CapabilityHandler> = new Map();
  private reconnectAttempt = 0;
  private heartbeatTimer: ReturnType<typeof setInterval> | null = null;

  constructor(config: ConnectorConfig) {
    this.config = config;
  }

  /**
   * Register a capability handler on this node.
   */
  addCapability(name: string, handler: CapabilityHandler): void {
    this.capabilities.set(name, handler);
  }

  /**
   * Start the node: register with hub, connect WebSocket, begin heartbeat.
   */
  async start(): Promise<void> {
    await this.register();
    this.connect();
  }

  private async register(): Promise<void> {
    const capabilities = Array.from(this.capabilities.keys()).map((name) => ({
      name,
      spec: { description: `Capability: ${name}` },
    }));

    const body = {
      machine_id: this.config.machineId,
      label: this.config.machineLabel,
      endpoint: this.config.runnerWsUrl,
      capabilities,
      resources: {
        platform: os.platform(),
        arch: os.arch(),
        cpus: os.cpus().length,
        memory_gb: +(os.totalmem() / 1e9).toFixed(1),
      },
      ttl_seconds: this.config.registrationTtlSeconds,
    };

    const res = await fetch(
      `${this.config.apiMomUrl}/v1/machines/register`,
      {
        method: "POST",
        headers: {
          "Content-Type": "application/json",
          Authorization: `Bearer ${this.config.secret}`,
        },
        body: JSON.stringify(body),
      }
    );

    if (!res.ok) {
      throw new Error(`Registration failed: ${res.status}`);
    }
  }

  private connect(): void {
    this.ws = new WebSocket(this.config.runnerWsUrl, {
      headers: { Authorization: `Bearer ${this.config.secret}` },
    });

    this.ws.on("open", () => {
      this.reconnectAttempt = 0;
      this.startHeartbeat();
    });

    this.ws.on("message", (raw) => {
      const msg = JSON.parse(raw.toString());
      if (msg.type === "request") {
        this.handleRequest(msg);
      } else if (msg.type === "ping") {
        this.ws?.send(JSON.stringify({ type: "pong" }));
      }
    });

    this.ws.on("close", () => {
      this.stopHeartbeat();
      this.scheduleReconnect();
    });
  }

  private async handleRequest(msg: DispatchRequest): Promise<void> {
    const handler = this.capabilities.get(msg.capability);
    if (!handler) {
      this.ws?.send(
        JSON.stringify({
          type: "response",
          id: msg.id,
          status: 404,
          body: { error: "unknown_capability" },
        })
      );
      return;
    }

    const result = await handler(msg.input);
    this.ws?.send(
      JSON.stringify({
        type: "response",
        id: msg.id,
        status: result.success ? 200 : 500,
        body: result,
      })
    );
  }

  private startHeartbeat(): void {
    this.heartbeatTimer = setInterval(async () => {
      await fetch(`${this.config.apiMomUrl}/v1/machines/heartbeat`, {
        method: "POST",
        headers: {
          "Content-Type": "application/json",
          Authorization: `Bearer ${this.config.secret}`,
        },
        body: JSON.stringify({
          machine_id: this.config.machineId,
          capabilities: Array.from(this.capabilities.keys()),
          resources: {
            platform: os.platform(),
            cpus: os.cpus().length,
            memory_gb: +(os.totalmem() / 1e9).toFixed(1),
            free_memory_gb: +(os.freemem() / 1e9).toFixed(1),
            load_avg: os.loadavg(),
          },
        }),
      }).catch(() => {});
    }, this.config.heartbeatIntervalMs);
  }

  private stopHeartbeat(): void {
    if (this.heartbeatTimer) {
      clearInterval(this.heartbeatTimer);
      this.heartbeatTimer = null;
    }
  }

  private scheduleReconnect(): void {
    const delay = Math.min(1000 * 2 ** this.reconnectAttempt, 60_000);
    const jitter = delay * (0.8 + Math.random() * 0.4);
    this.reconnectAttempt++;
    setTimeout(() => this.connect(), jitter);
  }
}

Hub: The Coordinator

The hub maintains the machine registry, routes capability requests, and orchestrates pipelines. In Atlas, the hub is a Cloudflare Durable Object β€” it is always available, globally distributed, and maintains persistent state.

class AtlasHub {
  private machines: Map<string, RegisteredMachine> = new Map();
  private capabilityIndex: Map<string, Set<string>> = new Map();
  private connections: Map<string, WebSocket> = new Map();
  private pipelines: Map<string, PipelineDefinition> = new Map();

  /**
   * Register a machine. Called via POST /v1/machines/register.
   */
  registerMachine(registration: MachineRegistration): void {
    const machine: RegisteredMachine = {
      ...registration,
      registeredAt: Date.now(),
      lastHeartbeat: Date.now(),
      busy: false,
      activeTasks: [],
      version: "0.1.0",
      connected: false,
    };

    this.machines.set(registration.machineId, machine);

    // Update capability index
    for (const cap of registration.capabilities) {
      if (!this.capabilityIndex.has(cap.name)) {
        this.capabilityIndex.set(cap.name, new Set());
      }
      this.capabilityIndex.get(cap.name)!.add(registration.machineId);
    }
  }

  /**
   * Get the full capability map β€” what's available across all machines.
   */
  getCapabilityMap(): Map<string, string[]> {
    const map = new Map<string, string[]>();
    for (const [capability, machineIds] of this.capabilityIndex) {
      map.set(capability, Array.from(machineIds));
    }
    return map;
  }

  /**
   * Execute a pipeline by dispatching each step to the appropriate machine.
   */
  async executePipeline(
    pipelineId: string,
    initialInput: unknown
  ): Promise<PipelineResult> {
    const pipeline = this.pipelines.get(pipelineId);
    if (!pipeline) {
      throw new Error(`Pipeline "${pipelineId}" not found`);
    }

    let currentInput = initialInput;
    const stepResults: StepResult[] = [];

    for (let i = 0; i < pipeline.steps.length; i++) {
      const step = pipeline.steps[i];

      try {
        const { machineId, result } = await this.dispatchToCapability(
          step.capability,
          currentInput,
          {
            timeoutMs: step.timeoutMs,
            pipelineId,
            stepIndex: i,
          }
        );

        const stepResult: StepResult = {
          step: i,
          capability: step.capability,
          machineId,
          status: "completed",
          result,
          durationMs: 0, // Would be filled from timing
        };

        stepResults.push(stepResult);

        // Transform output for next step
        if (step.outputTransform) {
          currentInput = step.outputTransform(result);
        } else {
          currentInput = result;
        }
      } catch (error) {
        const stepResult: StepResult = {
          step: i,
          capability: step.capability,
          machineId: "none",
          status: "failed",
          error: error instanceof Error ? error.message : String(error),
          durationMs: 0,
        };

        stepResults.push(stepResult);

        // Check retry policy
        if (step.retryPolicy && step.retryPolicy.maxRetries > 0) {
          // Retry logic would go here
        }

        if (!step.continueOnFailure) {
          return {
            pipelineId,
            status: "failed",
            steps: stepResults,
            failedAt: i,
          };
        }
      }
    }

    return {
      pipelineId,
      status: "completed",
      steps: stepResults,
      output: currentInput,
    };
  }

  // ... dispatchToCapability from earlier
  async dispatchToCapability(
    capabilityName: string,
    input: unknown,
    options?: { timeoutMs?: number; pipelineId?: string; stepIndex?: number }
  ): Promise<{ machineId: string; result: unknown }> {
    // Implementation from the Registry section above
    throw new Error("See full implementation in Registry section");
  }
}

Chain: The Pipeline

A chain is a sequence of capabilities that are executed in order, with the output of each step becoming the input of the next. The chain definition is declarative β€” it says what to do, not where to do it. The hub resolves each step to a machine at execution time.

interface PipelineDefinition {
  id: string;
  name: string;
  description: string;
  steps: PipelineStep[];
  /** Global timeout for the entire pipeline */
  timeoutMs?: number;
  /** Retry the entire pipeline on failure */
  retryPolicy?: RetryPolicy;
}

interface PipelineStep {
  /** Capability to invoke */
  capability: string;
  /** Optional: override the input (instead of using previous step's output) */
  inputOverride?: unknown;
  /** Transform the output before passing to the next step */
  outputTransform?: (output: unknown) => unknown;
  /** Timeout for this step */
  timeoutMs?: number;
  /** Retry policy for this step */
  retryPolicy?: RetryPolicy;
  /** Continue the pipeline even if this step fails */
  continueOnFailure?: boolean;
  /** Capability selector constraints (e.g., "must have GPU") */
  selector?: CapabilitySelector;
}

interface RetryPolicy {
  maxRetries: number;
  backoffMs: number;
  backoffMultiplier: number;
  maxBackoffMs: number;
}

interface StepResult {
  step: number;
  capability: string;
  machineId: string;
  status: "completed" | "failed" | "skipped";
  result?: unknown;
  error?: string;
  durationMs: number;
}

interface PipelineResult {
  pipelineId: string;
  status: "completed" | "failed";
  steps: StepResult[];
  output?: unknown;
  failedAt?: number;
}

// ── Example Pipeline Definitions ────────────────────────────────────────────

// Pipeline: Scrape β†’ Summarize β†’ Notify
const scrapeAndNotifyPipeline: PipelineDefinition = {
  id: "scrape-summarize-notify",
  name: "Scrape and Notify",
  description:
    "Scrape a URL, summarize the content, and send a HUD notification",
  steps: [
    {
      capability: "web-scrape",
      timeoutMs: 30_000,
      retryPolicy: {
        maxRetries: 2,
        backoffMs: 1000,
        backoffMultiplier: 2,
        maxBackoffMs: 10_000,
      },
    },
    {
      capability: "llm-summarize",
      selector: {
        matchLabels: { "atlas.dev/inference": "ollama" },
      },
      timeoutMs: 60_000,
    },
    {
      capability: "hud-notify",
      outputTransform: (summary) => ({
        severity: "green",
        message: `Summary: ${(summary as { text: string }).text}`,
        source: "pipeline/scrape-summarize-notify",
      }),
      selector: {
        matchLabels: { "atlas.dev/display": "true" },
      },
    },
  ],
  timeoutMs: 120_000,
};

// Pipeline: Inference β†’ Validate β†’ Deploy
const inferenceDeployPipeline: PipelineDefinition = {
  id: "inference-validate-deploy",
  name: "Inference and Deploy",
  description: "Run inference, validate the result, deploy to production",
  steps: [
    {
      capability: "gpu-inference",
      selector: {
        matchLabels: { "atlas.dev/gpu": "true" },
        matchExpressions: [
          { key: "atlas.dev/gpu-vram", operator: "Gt", values: ["8"] },
        ],
      },
      timeoutMs: 120_000,
    },
    {
      capability: "validate-output",
      timeoutMs: 10_000,
    },
    {
      capability: "deploy-result",
      timeoutMs: 30_000,
      retryPolicy: {
        maxRetries: 3,
        backoffMs: 2000,
        backoffMultiplier: 2,
        maxBackoffMs: 30_000,
      },
    },
  ],
};

How Temporal Does It

Temporal.io uses a similar pattern with Task Queues. Workers subscribe to task queues. Workflows dispatch activities to queues. The Temporal server routes tasks to available workers.

The key differences from the Atlas approach:

AspectTemporalAtlas
Worker bindingWorkers poll specific task queuesMachines register capabilities, hub pushes work
Task routingQueue name is the routing keyCapability name + label selector
State persistenceTemporal server (Cassandra/PostgreSQL)Durable Object (Cloudflare)
Failure handlingBuilt-in activity retries, heartbeats, timeoutsCustom retry policy per step
Workflow definitionCode (TypeScript/Go/Java SDK)Declarative pipeline JSON

Temporal’s approach is more mature and battle-tested for complex workflows. Atlas’s approach is simpler, cloud-native (Cloudflare DOs), and designed specifically for AI agent coordination rather than general-purpose workflow orchestration.

How Airflow Does It

Apache Airflow uses DAGs (Directed Acyclic Graphs) to define pipelines. Each task in the DAG is assigned to an executor pool. The scheduler determines when tasks run based on dependencies and pool availability.

from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime

with DAG("scrape_summarize_notify", start_date=datetime(2026, 1, 1)) as dag:
    scrape = PythonOperator(
        task_id="web_scrape",
        python_callable=scrape_url,
        pool="network_workers",  # Airflow's version of capability routing
    )
    summarize = PythonOperator(
        task_id="llm_summarize",
        python_callable=summarize_text,
        pool="gpu_workers",
    )
    notify = PythonOperator(
        task_id="hud_notify",
        python_callable=send_notification,
        pool="display_workers",
    )
    scrape >> summarize >> notify

Airflow pools are the closest analog to Atlas capability routing, but they are static (configured in the Airflow metadata database) while Atlas capabilities are dynamic (discovered at registration time).


The Machine Identity Problem

When a new machine joins the network, three things must be true:

  1. The machine is who it claims to be. A malicious machine cannot impersonate a legitimate one.
  2. The hub is who it claims to be. A MITM cannot intercept the registration and redirect the machine to a fake hub.
  3. Communication is encrypted. No eavesdropping on capability invocations.

Token-Based Authentication (Current Atlas Pattern)

The simplest approach β€” and the one used by k3s, Atlas, and many distributed systems β€” is pre-shared tokens.

// The install script receives a token
// curl -sfL https://install.atlas.dev | sh -s -- --token "$ATLAS_TOKEN"

// The connector uses it for all communication
const config: ConnectorConfig = {
  apiMomUrl: "https://api-mom.garywu.dev",
  secret: process.env.ATLAS_TOKEN!, // Pre-shared secret
  machineId: readMachineId(),
  // ...
};

// Registration: Bearer token in Authorization header
await fetch(`${config.apiMomUrl}/v1/machines/register`, {
  headers: {
    Authorization: `Bearer ${config.secret}`,
  },
  body: JSON.stringify(registrationPayload),
});

// WebSocket: Bearer token in handshake
const ws = new WebSocket(config.runnerWsUrl, {
  headers: {
    Authorization: `Bearer ${config.secret}`,
  },
});

Limitations:

Per-Machine Keys with Registration Tokens

A better approach: use a one-time registration token to bootstrap per-machine credentials.

interface RegistrationFlow {
  // Step 1: Admin generates a one-time registration token
  // POST /v1/admin/registration-tokens
  // Returns: { token: "reg_abc123...", expiresAt: "...", maxUses: 1 }

  // Step 2: Machine uses the token to register and get permanent credentials
  // POST /v1/machines/register
  // Authorization: Bearer reg_abc123...
  // Returns: { machineToken: "mach_xyz...", machineId: "...", certPem: "..." }

  // Step 3: Machine uses its permanent token for all further communication
  // Authorization: Bearer mach_xyz...
}

// Implementation
async function bootstrapMachineCredentials(
  hubUrl: string,
  registrationToken: string,
  machineId: string,
  publicKey: string
): Promise<{
  machineToken: string;
  hubPublicKey: string;
  certificate: string;
}> {
  const res = await fetch(`${hubUrl}/v1/machines/register`, {
    method: "POST",
    headers: {
      "Content-Type": "application/json",
      Authorization: `Bearer ${registrationToken}`,
    },
    body: JSON.stringify({
      machine_id: machineId,
      public_key: publicKey,
      // Challenge: prove we have the private key
      challenge_response: await signChallenge(machineId),
    }),
  });

  if (res.status === 401) {
    throw new Error("Registration token expired or invalid");
  }

  const credentials = (await res.json()) as {
    machine_token: string;
    hub_public_key: string;
    certificate: string;
  };

  // Store credentials securely
  const fs = await import("node:fs/promises");
  const path = await import("node:path");
  const credPath = path.join(os.homedir(), ".atlas", "credentials.json");

  await fs.writeFile(
    credPath,
    JSON.stringify(
      {
        machineId,
        machineToken: credentials.machine_token,
        hubPublicKey: credentials.hub_public_key,
        certificate: credentials.certificate,
        issuedAt: new Date().toISOString(),
      },
      null,
      2
    ),
    { mode: 0o600 }
  );

  return {
    machineToken: credentials.machine_token,
    hubPublicKey: credentials.hub_public_key,
    certificate: credentials.certificate,
  };
}

mTLS for Machine-to-Machine Communication

For the highest security, machines use mutual TLS (mTLS) to authenticate to each other. The hub acts as a Certificate Authority (CA), issuing certificates during registration.

import * as tls from "node:tls";
import * as fs from "node:fs";

// Machine-to-machine connection with mTLS
function createMtlsConnection(
  targetHost: string,
  targetPort: number
): tls.TLSSocket {
  const atlasDir = `${os.homedir()}/.atlas`;

  return tls.connect({
    host: targetHost,
    port: targetPort,
    // Our certificate (issued by the hub CA)
    cert: fs.readFileSync(`${atlasDir}/certs/machine.crt`),
    key: fs.readFileSync(`${atlasDir}/keys/machine.key`),
    // Trust only the hub's CA
    ca: fs.readFileSync(`${atlasDir}/certs/hub-ca.crt`),
    // Reject connections to machines not signed by the hub CA
    rejectUnauthorized: true,
    // Verify the peer's certificate
    requestCert: true,
  });
}

// Hub-side: issue a machine certificate during registration
async function issueMachineCertificate(
  machineId: string,
  publicKey: string,
  capabilities: string[]
): Promise<string> {
  // In practice, you would use a proper CA library (e.g., step-ca, CFSSL)
  // This is the concept:
  const certificate = await signCertificate({
    subject: {
      CN: machineId,
      O: "Atlas Network",
      OU: capabilities.join(","),
    },
    publicKey,
    validity: {
      notBefore: new Date(),
      notAfter: new Date(Date.now() + 365 * 24 * 60 * 60 * 1000), // 1 year
    },
    extensions: {
      // Machine capabilities encoded in the certificate
      subjectAltName: capabilities.map((c) => `URI:atlas:capability:${c}`),
    },
  });

  return certificate;
}

Token Rotation

Tokens should rotate periodically. The heartbeat is the natural rotation point.

interface TokenRotation {
  /** Current token */
  currentToken: string;
  /** Previous token (still accepted for a grace period) */
  previousToken?: string;
  /** When the current token was issued */
  issuedAt: number;
  /** When the current token expires */
  expiresAt: number;
  /** Grace period: how long previousToken is still accepted (ms) */
  gracePeriodMs: number;
}

// Hub-side: rotate tokens during heartbeat
function handleHeartbeat(
  machineId: string,
  token: string,
  rotation: TokenRotation
): HeartbeatResponse {
  const now = Date.now();

  // Check if token needs rotation
  const tokenAge = now - rotation.issuedAt;
  const maxTokenAge = 24 * 60 * 60 * 1000; // 24 hours

  if (tokenAge > maxTokenAge) {
    const newToken = generateSecureToken();

    return {
      registered: true,
      tokenRotation: {
        newToken,
        effectiveAt: new Date(now + 60_000).toISOString(), // 1 min grace
        oldTokenExpiresAt: new Date(now + 5 * 60_000).toISOString(), // 5 min
      },
    };
  }

  return { registered: true };
}

// Connector-side: handle token rotation from heartbeat response
async function applyTokenRotation(
  response: HeartbeatResponse,
  configPath: string
): Promise<void> {
  if (!response.tokenRotation) return;

  const { newToken, effectiveAt } = response.tokenRotation;

  // Schedule the token switch
  const switchAt = new Date(effectiveAt).getTime() - Date.now();

  setTimeout(async () => {
    const config = JSON.parse(
      await fs.readFile(configPath, "utf-8")
    );
    config.previousToken = config.token;
    config.token = newToken;
    await fs.writeFile(configPath, JSON.stringify(config, null, 2));

    console.log("[connector] Token rotated successfully");
  }, Math.max(0, switchAt));
}

Security Comparison

ApproachComplexitySecurityOperations Cost
Shared secret (current Atlas)LowLow β€” one leaked token compromises everythingLow β€” one env var
Per-machine tokens (k3s pattern)MediumMedium β€” leaked token compromises one machineMedium β€” manage per-machine tokens
Registration token + per-machine credsMedium-HighHigh β€” registration tokens are ephemeral, machine tokens are uniqueMedium β€” token rotation needed
mTLS with hub CAHighVery High β€” certificates, mutual auth, capability encodingHigh β€” CA management, cert renewal
Tailscale (WireGuard mesh)Low (outsourced)Very High β€” WireGuard encryption, Tailscale identityLow β€” Tailscale handles everything

Key insight: Start with per-machine tokens issued via one-time registration codes. This gives you 80% of the security of mTLS with 20% of the operational complexity. If you need cross-machine direct communication (not just hub-to-node), add Tailscale as the transport layer rather than building your own mTLS infrastructure.


Pattern 1: Exponential Backoff with Jitter

When a connector loses its WebSocket connection, it must reconnect β€” but not all at once. If the hub goes down and comes back, every machine reconnecting simultaneously creates a β€œthundering herd” that can crash the hub again.

class ReconnectStrategy {
  private attempt = 0;
  private readonly baseDelayMs: number;
  private readonly maxDelayMs: number;
  private readonly jitterFactor: number;

  constructor(options?: {
    baseDelayMs?: number;
    maxDelayMs?: number;
    jitterFactor?: number;
  }) {
    this.baseDelayMs = options?.baseDelayMs ?? 1000;
    this.maxDelayMs = options?.maxDelayMs ?? 60_000;
    this.jitterFactor = options?.jitterFactor ?? 0.4; // +/- 20%
  }

  /**
   * Get the next delay, then increment the attempt counter.
   * Exponential backoff: delay = base * 2^attempt
   * With jitter: delay * (1 - jitterFactor/2 + random * jitterFactor)
   */
  nextDelay(): number {
    const exponential = this.baseDelayMs * 2 ** this.attempt;
    const capped = Math.min(exponential, this.maxDelayMs);
    const jitterMin = 1 - this.jitterFactor / 2;
    const jitter = jitterMin + Math.random() * this.jitterFactor;
    const delay = capped * jitter;

    this.attempt++;
    return Math.round(delay);
  }

  reset(): void {
    this.attempt = 0;
  }

  get currentAttempt(): number {
    return this.attempt;
  }
}

// Usage in the connector
const reconnect = new ReconnectStrategy({
  baseDelayMs: 1000,
  maxDelayMs: 60_000,
  jitterFactor: 0.4,
});

function scheduleReconnect(): void {
  const delay = reconnect.nextDelay();
  console.log(
    `[connector] reconnecting in ${(delay / 1000).toFixed(1)}s ` +
      `(attempt ${reconnect.currentAttempt})`
  );
  setTimeout(() => connect(), delay);
}

// On successful connection:
ws.on("open", () => {
  reconnect.reset();
  // ...
});

When to use: Every WebSocket, HTTP long-poll, or TCP connection that needs automatic reconnection.

Gotcha: Without jitter, all machines that disconnected at the same time will reconnect at the same time. The jitter spreads the reconnection window, preventing thundering herd.

Pattern 2: Capability-Gated Dispatch

Route work to machines based on declared capabilities, not hardcoded addresses.

class CapabilityRouter {
  private registry: MachineRegistry;

  constructor(registry: MachineRegistry) {
    this.registry = registry;
  }

  /**
   * Find the best machine for a capability.
   * Selection criteria (in priority order):
   * 1. Machine has the capability
   * 2. Machine is not busy
   * 3. Machine meets resource requirements
   * 4. Machine has the lowest load average
   */
  selectMachine(
    capability: string,
    requirements?: {
      gpu?: boolean;
      minMemoryGb?: number;
      preferLocal?: boolean;
    }
  ): RegisteredMachine | null {
    const candidates = this.findMachinesForCapability(capability);

    if (candidates.length === 0) return null;

    // Filter by requirements
    let filtered = candidates;
    if (requirements?.gpu) {
      filtered = filtered.filter(
        (m) => (m.resources.gpus?.length ?? 0) > 0
      );
    }
    if (requirements?.minMemoryGb) {
      filtered = filtered.filter(
        (m) => m.resources.freeMemoryGb >= requirements.minMemoryGb!
      );
    }

    if (filtered.length === 0) return null;

    // Score each candidate
    const scored = filtered.map((machine) => {
      let score = 100;

      // Penalty for being busy
      if (machine.busy) score -= 50;

      // Penalty for high load
      const load = machine.resources.loadAvg?.[0] ?? 0;
      const loadPerCore = load / machine.resources.cpus;
      score -= loadPerCore * 20;

      // Penalty for low memory
      const memPct =
        machine.resources.freeMemoryGb / machine.resources.memoryGb;
      if (memPct < 0.2) score -= 30;

      // Bonus for local machine (if preferLocal)
      if (requirements?.preferLocal) {
        // Machine is on the same network (detected via mDNS)
        // This is a heuristic β€” real implementation would check network
        score += 10;
      }

      return { machine, score };
    });

    // Sort by score descending
    scored.sort((a, b) => b.score - a.score);

    return scored[0]?.machine ?? null;
  }

  private findMachinesForCapability(
    capability: string
  ): RegisteredMachine[] {
    const machineIds = this.registry.capabilityIndex.get(capability);
    if (!machineIds) return [];

    const now = Date.now();
    return Array.from(machineIds)
      .map((id) => this.registry.machines.get(id))
      .filter((m): m is RegisteredMachine => {
        if (!m) return false;
        if (!m.connected) return false;
        const expiresAt = m.lastHeartbeat + m.ttlSeconds * 1000;
        return now < expiresAt;
      });
  }
}

When to use: Any time you need to dispatch work to heterogeneous machines. The caller says what they need, not where to send it.

Gotcha: Capability routing is only as good as the capability declarations. If a machine declares β€œgpu-inference” but its VRAM is full, the dispatch will fail. Capabilities must be dynamically validated.

Pattern 3: Pipeline Checkpoint and Resume

Long-running pipelines need checkpointing so that if a step fails partway through, the pipeline can resume from the last successful step rather than starting over.

interface PipelineCheckpoint {
  pipelineId: string;
  executionId: string;
  currentStep: number;
  stepResults: StepResult[];
  /** Input for the next step (output of the last successful step) */
  nextInput: unknown;
  startedAt: string;
  lastCheckpointAt: string;
  status: "running" | "paused" | "failed" | "completed";
}

class CheckpointedPipeline {
  private storage: DurableObjectStorage; // Or any KV store

  constructor(storage: DurableObjectStorage) {
    this.storage = storage;
  }

  /**
   * Execute a pipeline with checkpointing.
   * If a previous execution exists and was interrupted, resume from the
   * last checkpoint.
   */
  async execute(
    pipeline: PipelineDefinition,
    initialInput: unknown,
    executionId?: string
  ): Promise<PipelineResult> {
    const execId = executionId ?? crypto.randomUUID();

    // Check for existing checkpoint
    let checkpoint = await this.loadCheckpoint(execId);

    if (checkpoint && checkpoint.status === "running") {
      console.log(
        `[pipeline] Resuming execution ${execId} from step ${checkpoint.currentStep}`
      );
    } else {
      checkpoint = {
        pipelineId: pipeline.id,
        executionId: execId,
        currentStep: 0,
        stepResults: [],
        nextInput: initialInput,
        startedAt: new Date().toISOString(),
        lastCheckpointAt: new Date().toISOString(),
        status: "running",
      };
    }

    // Execute remaining steps
    for (
      let i = checkpoint.currentStep;
      i < pipeline.steps.length;
      i++
    ) {
      const step = pipeline.steps[i];

      try {
        const result = await this.executeStep(
          step,
          checkpoint.nextInput,
          pipeline.id,
          i
        );

        checkpoint.stepResults.push(result);
        checkpoint.nextInput = result.result;
        checkpoint.currentStep = i + 1;
        checkpoint.lastCheckpointAt = new Date().toISOString();

        // Persist checkpoint after each step
        await this.saveCheckpoint(checkpoint);
      } catch (error) {
        checkpoint.status = "failed";
        await this.saveCheckpoint(checkpoint);

        return {
          pipelineId: pipeline.id,
          status: "failed",
          steps: checkpoint.stepResults,
          failedAt: i,
        };
      }
    }

    checkpoint.status = "completed";
    await this.saveCheckpoint(checkpoint);

    return {
      pipelineId: pipeline.id,
      status: "completed",
      steps: checkpoint.stepResults,
      output: checkpoint.nextInput,
    };
  }

  private async executeStep(
    step: PipelineStep,
    input: unknown,
    pipelineId: string,
    stepIndex: number
  ): Promise<StepResult> {
    const startTime = Date.now();

    // Retry loop
    let lastError: Error | null = null;
    const maxRetries = step.retryPolicy?.maxRetries ?? 0;

    for (let attempt = 0; attempt <= maxRetries; attempt++) {
      try {
        const { machineId, result } = await this.dispatch(
          step.capability,
          input,
          { timeoutMs: step.timeoutMs, pipelineId, stepIndex }
        );

        return {
          step: stepIndex,
          capability: step.capability,
          machineId,
          status: "completed",
          result: step.outputTransform
            ? step.outputTransform(result)
            : result,
          durationMs: Date.now() - startTime,
        };
      } catch (err) {
        lastError = err instanceof Error ? err : new Error(String(err));

        if (attempt < maxRetries) {
          const backoff = Math.min(
            (step.retryPolicy?.backoffMs ?? 1000) *
              (step.retryPolicy?.backoffMultiplier ?? 2) ** attempt,
            step.retryPolicy?.maxBackoffMs ?? 30_000
          );
          await new Promise((resolve) => setTimeout(resolve, backoff));
        }
      }
    }

    throw lastError;
  }

  private async dispatch(
    capability: string,
    input: unknown,
    options: { timeoutMs?: number; pipelineId: string; stepIndex: number }
  ): Promise<{ machineId: string; result: unknown }> {
    // Delegate to the hub's dispatch mechanism
    throw new Error("Implement via hub reference");
  }

  private async loadCheckpoint(
    executionId: string
  ): Promise<PipelineCheckpoint | null> {
    return (
      (await this.storage.get<PipelineCheckpoint>(
        `checkpoint:${executionId}`
      )) ?? null
    );
  }

  private async saveCheckpoint(
    checkpoint: PipelineCheckpoint
  ): Promise<void> {
    await this.storage.put(
      `checkpoint:${checkpoint.executionId}`,
      checkpoint
    );
  }
}

When to use: Any multi-step pipeline where individual steps are expensive (GPU inference, large data transfers) and re-execution is wasteful.

Connection to other patterns: Checkpointing uses the same storage layer as the hub’s machine registry. Both are Durable Object state. The pipeline executor references the capability router (Pattern 2) for dispatch.

Pattern 4: Fan-Out / Fan-In

When a step needs to run on multiple machines in parallel (e.g., distributed inference across GPUs), use fan-out to dispatch to all capable machines and fan-in to collect results.

interface FanOutStep {
  capability: string;
  /** How to split the input for parallel execution */
  splitter: (input: unknown) => unknown[];
  /** How to combine the results */
  reducer: (results: unknown[]) => unknown;
  /** Maximum parallelism */
  maxParallel?: number;
  /** Minimum machines required (fail if fewer available) */
  minMachines?: number;
}

async function executeFanOut(
  step: FanOutStep,
  input: unknown,
  router: CapabilityRouter,
  registry: MachineRegistry
): Promise<{ results: unknown[]; combined: unknown }> {
  // Split input into chunks
  const chunks = step.splitter(input);

  // Find all machines with this capability
  const machines = router.findAllMachinesForCapability(step.capability);

  if (step.minMachines && machines.length < step.minMachines) {
    throw new Error(
      `Fan-out requires ${step.minMachines} machines for "${step.capability}", ` +
        `but only ${machines.length} available`
    );
  }

  // Limit parallelism
  const maxParallel = step.maxParallel ?? machines.length;
  const concurrency = Math.min(maxParallel, machines.length, chunks.length);

  // Distribute chunks across machines (round-robin)
  const assignments: { machine: RegisteredMachine; chunk: unknown }[] =
    chunks.map((chunk, i) => ({
      machine: machines[i % machines.length],
      chunk,
    }));

  // Execute in parallel with concurrency limit
  const results: unknown[] = [];
  const executing = new Set<Promise<void>>();

  for (const assignment of assignments) {
    const promise = (async () => {
      const ws = registry.connections.get(assignment.machine.machineId);
      if (!ws) throw new Error("Machine disconnected during fan-out");

      const requestId = crypto.randomUUID();
      const result = await dispatchAndWait(ws, {
        type: "request",
        id: requestId,
        capability: step.capability,
        input: assignment.chunk,
      });

      results.push(result);
    })();

    executing.add(promise);
    promise.finally(() => executing.delete(promise));

    if (executing.size >= concurrency) {
      await Promise.race(executing);
    }
  }

  // Wait for all remaining
  await Promise.all(executing);

  // Reduce results
  const combined = step.reducer(results);

  return { results, combined };
}

// Example: Distributed text embedding across GPU machines
const embeddingFanOut: FanOutStep = {
  capability: "text-embedding",
  splitter: (input) => {
    const texts = (input as { texts: string[] }).texts;
    // Split into chunks of 100
    const chunks: unknown[] = [];
    for (let i = 0; i < texts.length; i += 100) {
      chunks.push({ texts: texts.slice(i, i + 100) });
    }
    return chunks;
  },
  reducer: (results) => ({
    embeddings: results.flatMap(
      (r) => (r as { embeddings: number[][] }).embeddings
    ),
  }),
  maxParallel: 4,
  minMachines: 1,
};

Pattern 5: Self-Update via Hub Command

The hub can instruct machines to upgrade themselves. This uses the heartbeat response as a command channel.

// Hub side: schedule an update rollout
class UpdateRollout {
  private machines: Map<string, RegisteredMachine>;
  private targetVersion: string;
  private downloadUrl: string;
  private rollingPercentage: number;
  private updatedMachines: Set<string> = new Set();

  constructor(
    machines: Map<string, RegisteredMachine>,
    targetVersion: string,
    downloadUrl: string,
    rollingPercentage = 10 // Update 10% at a time
  ) {
    this.machines = machines;
    this.targetVersion = targetVersion;
    this.downloadUrl = downloadUrl;
    this.rollingPercentage = rollingPercentage;
  }

  /**
   * Called on each heartbeat. Returns update command if this machine
   * should update.
   */
  shouldUpdate(machineId: string, currentVersion: string): UpdateCommand | null {
    if (currentVersion === this.targetVersion) {
      this.updatedMachines.add(machineId);
      return null;
    }

    // Check if we have capacity to update more machines
    const totalMachines = this.machines.size;
    const currentlyUpdating = Array.from(this.machines.values()).filter(
      (m) => m.version !== this.targetVersion && !this.updatedMachines.has(m.machineId)
    ).length;

    const maxUpdating = Math.ceil(
      (totalMachines * this.rollingPercentage) / 100
    );

    if (currentlyUpdating >= maxUpdating) {
      return null; // Wait for current batch to finish
    }

    return {
      type: "update",
      version: this.targetVersion,
      url: this.downloadUrl,
      restart: true,
    };
  }
}

// Connector side: handle update command
async function handleUpdate(command: UpdateCommand): Promise<void> {
  if (!command.version || !command.url) {
    console.warn("[connector] update command missing version or URL");
    return;
  }

  console.log(`[connector] Updating to version ${command.version}...`);

  const tmpDir = await fs.mkdtemp(path.join(os.tmpdir(), "atlas-update-"));

  try {
    // Download new binary
    const response = await fetch(command.url);
    const buffer = Buffer.from(await response.arrayBuffer());
    const tmpPath = path.join(tmpDir, "atlas-connector");
    await fs.writeFile(tmpPath, buffer);
    await fs.chmod(tmpPath, 0o755);

    // Verify the new binary runs
    const { execFile } = await import("node:child_process");
    await new Promise<void>((resolve, reject) => {
      execFile(tmpPath, ["--version"], { timeout: 5000 }, (err, stdout) => {
        if (err) reject(err);
        else {
          console.log(`[connector] New binary version: ${stdout.trim()}`);
          resolve();
        }
      });
    });

    // Replace the current binary
    const currentPath = process.argv[1]; // Or configured install path
    await fs.copyFile(tmpPath, currentPath);
    await fs.chmod(currentPath, 0o755);

    console.log("[connector] Binary updated successfully");

    if (command.restart) {
      console.log("[connector] Restarting...");
      process.exit(0); // launchd/systemd will restart us
    }
  } finally {
    await fs.rm(tmpDir, { recursive: true, force: true });
  }
}

Example 1: Minimal Connector (40 lines)

The simplest possible connector that registers and heartbeats:

import WebSocket from "ws";

const HUB = process.env.ATLAS_HUB ?? "wss://api-mom.garywu.dev/connect";
const TOKEN = process.env.ATLAS_TOKEN!;
const MACHINE = process.env.HOSTNAME ?? "unknown";

function connect() {
  const ws = new WebSocket(`${HUB}?machine=${MACHINE}`, {
    headers: { Authorization: `Bearer ${TOKEN}` },
  });

  ws.on("open", () => {
    console.log("Connected");
    ws.send(JSON.stringify({ type: "status-report", machine: MACHINE, busy: false }));
  });

  ws.on("message", (raw) => {
    const msg = JSON.parse(raw.toString());
    if (msg.type === "ping") ws.send(JSON.stringify({ type: "pong" }));
    if (msg.type === "request") {
      ws.send(JSON.stringify({
        type: "response", id: msg.id, status: 200,
        body: { echo: msg.body },
      }));
    }
  });

  ws.on("close", () => setTimeout(connect, 5000));
  ws.on("error", () => {});
}

connect();

// Heartbeat every 60s
setInterval(() => {
  fetch(`${HUB.replace("wss", "https").replace("/connect", "")}/v1/machines/heartbeat`, {
    method: "POST",
    headers: { "Content-Type": "application/json", Authorization: `Bearer ${TOKEN}` },
    body: JSON.stringify({ machine_id: MACHINE, capabilities: ["echo"] }),
  }).catch(() => {});
}, 60_000);

Example 2: GPU Capability Auto-Detection

Detect GPU availability and report it as a capability:

import { execFile } from "node:child_process";

interface GpuInfo {
  name: string;
  memoryMb: number;
  freeMemoryMb: number;
  utilization: number;
}

async function detectGpus(): Promise<GpuInfo[]> {
  // Try nvidia-smi first
  try {
    return await detectNvidiaGpus();
  } catch {}

  // Try macOS Metal (Apple Silicon)
  try {
    return await detectAppleGpu();
  } catch {}

  return [];
}

function detectNvidiaGpus(): Promise<GpuInfo[]> {
  return new Promise((resolve, reject) => {
    execFile(
      "nvidia-smi",
      ["--query-gpu=name,memory.total,memory.free,utilization.gpu",
       "--format=csv,noheader,nounits"],
      { timeout: 5000 },
      (err, stdout) => {
        if (err) return reject(err);
        const gpus = stdout.trim().split("\n").map((line) => {
          const [name, totalMem, freeMem, util] = line.split(", ");
          return {
            name: name.trim(),
            memoryMb: parseInt(totalMem),
            freeMemoryMb: parseInt(freeMem),
            utilization: parseInt(util),
          };
        });
        resolve(gpus);
      }
    );
  });
}

function detectAppleGpu(): Promise<GpuInfo[]> {
  return new Promise((resolve, reject) => {
    execFile(
      "system_profiler",
      ["SPDisplaysDataType", "-json"],
      { timeout: 5000 },
      (err, stdout) => {
        if (err) return reject(err);
        const data = JSON.parse(stdout);
        const displays = data.SPDisplaysDataType ?? [];
        const gpus = displays.map((d: Record<string, string>) => ({
          name: d.sppci_model ?? "Apple GPU",
          memoryMb: parseInt(d.sppci_vram ?? "0") * 1024, // Convert GB to MB
          freeMemoryMb: 0, // macOS doesn't expose free VRAM easily
          utilization: 0,
        }));
        resolve(gpus);
      }
    );
  });
}

// Use in capability registration
const gpus = await detectGpus();
if (gpus.length > 0) {
  capabilities.push({
    name: "gpu-inference",
    description: `GPU inference (${gpus[0].name})`,
    resources: { gpu: true },
    tags: gpus.map((g) => g.name),
  });
}

Example 3: mDNS Peer Discovery with Auto-Registration

Find Atlas connectors on the local network and register them with the hub:

import { Bonjour } from "bonjour-service";

const bonjour = new Bonjour();

// Advertise ourselves
bonjour.publish({
  name: `atlas-${machineId}`,
  type: "atlas-connector",
  protocol: "tcp",
  port: 7654,
  txt: {
    id: machineId,
    role: "node",
    caps: capabilities.map((c) => c.name).join(","),
    version: "0.1.0",
  },
});

// Discover and report peers to the hub
bonjour.find({ type: "atlas-connector" }, (service) => {
  const txt = service.txt as Record<string, string>;
  if (txt.id === machineId) return; // Skip self

  console.log(`Discovered peer: ${txt.id} at ${service.host}:${service.port}`);
  console.log(`  Capabilities: ${txt.caps}`);

  // Optionally: establish direct connection for P2P communication
  // This is useful for large data transfers (don't route through the hub)
});

Example 4: Heartbeat with Adaptive Interval

Increase heartbeat frequency when the machine is busy, decrease when idle:

class AdaptiveHeartbeat {
  private baseInterval: number;
  private minInterval: number;
  private maxInterval: number;
  private currentInterval: number;
  private timer: ReturnType<typeof setTimeout> | null = null;
  private busy = false;

  constructor(options?: {
    baseIntervalMs?: number;
    minIntervalMs?: number;
    maxIntervalMs?: number;
  }) {
    this.baseInterval = options?.baseIntervalMs ?? 60_000;
    this.minInterval = options?.minIntervalMs ?? 10_000;
    this.maxInterval = options?.maxIntervalMs ?? 300_000;
    this.currentInterval = this.baseInterval;
  }

  start(sendHeartbeat: () => Promise<void>): void {
    const tick = async () => {
      await sendHeartbeat();
      this.timer = setTimeout(tick, this.currentInterval);
    };
    tick();
  }

  setBusy(busy: boolean): void {
    const wasBusy = this.busy;
    this.busy = busy;

    if (busy && !wasBusy) {
      // Machine just became busy β€” heartbeat faster
      this.currentInterval = this.minInterval;
    } else if (!busy && wasBusy) {
      // Machine just became idle β€” return to base
      this.currentInterval = this.baseInterval;
    }
  }

  /**
   * Gradually increase interval during long idle periods.
   * Called by the heartbeat response handler.
   */
  adjustForIdle(): void {
    if (!this.busy) {
      this.currentInterval = Math.min(
        this.currentInterval * 1.5,
        this.maxInterval
      );
    }
  }

  stop(): void {
    if (this.timer) {
      clearTimeout(this.timer);
      this.timer = null;
    }
  }
}

Example 5: Capability Health Check

Verify that declared capabilities still work:

async function healthCheckCapabilities(
  capabilities: Map<string, CapabilityHandler>
): Promise<{ name: string; healthy: boolean; error?: string }[]> {
  const results: { name: string; healthy: boolean; error?: string }[] = [];

  for (const [name, handler] of capabilities) {
    try {
      // Each capability should handle empty/minimal input gracefully
      const result = await Promise.race([
        handler({}),
        new Promise<CapabilityResult>((_, reject) =>
          setTimeout(() => reject(new Error("Health check timeout")), 5000)
        ),
      ]);

      results.push({
        name,
        healthy: result.success || result.error === undefined,
        error: result.error,
      });
    } catch (err) {
      results.push({
        name,
        healthy: false,
        error: err instanceof Error ? err.message : String(err),
      });
    }
  }

  return results;
}

// Run health checks periodically and update capability advertisements
setInterval(async () => {
  const health = await healthCheckCapabilities(CAPABILITY_HANDLERS);
  const unhealthy = health.filter((h) => !h.healthy);

  if (unhealthy.length > 0) {
    console.warn(
      `[connector] Unhealthy capabilities: ${unhealthy.map((h) => h.name).join(", ")}`
    );

    // Re-advertise only healthy capabilities
    const healthyCaps = health
      .filter((h) => h.healthy)
      .map((h) => h.name);

    ws?.send(
      JSON.stringify({
        type: "capability-advertisement",
        capabilities: healthyCaps.map((name) => ({
          name,
          spec: { description: `Capability: ${name}` },
        })),
      })
    );
  }
}, 5 * 60_000); // Every 5 minutes

Example 6: Pipeline Definition as JSON

Define pipelines in a JSON file that the hub loads:

{
  "pipelines": [
    {
      "id": "daily-brief",
      "name": "Daily Brief Pipeline",
      "description": "Gather metrics, summarize, and push to HUD",
      "schedule": "0 8 * * *",
      "steps": [
        {
          "capability": "gather-metrics",
          "timeoutMs": 30000,
          "retryPolicy": { "maxRetries": 2, "backoffMs": 5000 }
        },
        {
          "capability": "llm-summarize",
          "selector": { "matchLabels": { "atlas.dev/inference": "ollama" } },
          "timeoutMs": 60000
        },
        {
          "capability": "hud-notify",
          "selector": { "matchLabels": { "atlas.dev/display": "true" } },
          "inputTransform": "$.summary -> { severity: 'green', message: $.text }"
        }
      ]
    },
    {
      "id": "deploy-preview",
      "name": "Deploy Preview",
      "description": "Build, test, and deploy a preview environment",
      "steps": [
        {
          "capability": "git-clone",
          "timeoutMs": 60000
        },
        {
          "capability": "npm-build",
          "selector": { "matchLabels": { "atlas.dev/arch": "arm64" } },
          "timeoutMs": 300000
        },
        {
          "capability": "wrangler-deploy",
          "timeoutMs": 60000,
          "retryPolicy": { "maxRetries": 3, "backoffMs": 10000 }
        }
      ]
    }
  ]
}

Example 7: WebSocket Message Router

A typed message router that dispatches to handlers based on message type:

type MessageHandler<T> = (msg: T) => Promise<void> | void;

class MessageRouter {
  private handlers: Map<string, MessageHandler<any>> = new Map();

  on<T extends { type: string }>(
    type: T["type"],
    handler: MessageHandler<T>
  ): this {
    this.handlers.set(type, handler);
    return this;
  }

  async route(raw: string): Promise<void> {
    let msg: { type: string };
    try {
      msg = JSON.parse(raw);
    } catch {
      console.warn("Unparseable message");
      return;
    }

    const handler = this.handlers.get(msg.type);
    if (handler) {
      await handler(msg);
    } else {
      console.warn(`No handler for message type: ${msg.type}`);
    }
  }
}

// Usage
const router = new MessageRouter()
  .on<PingMessage>("ping", () => {
    ws.send(JSON.stringify({ type: "pong" }));
  })
  .on<DispatchRequest>("request", async (msg) => {
    await handleDispatch(ws, msg, machineId);
  })
  .on<UpdateCommand>("update", async (msg) => {
    await handleUpdate(msg);
  })
  .on<ConfigPush>("config-push", (msg) => {
    applyConfig(msg.config);
  });

ws.on("message", (raw) => router.route(raw.toString()));

Example 8: Graceful Shutdown with Drain

Complete in-flight tasks before shutting down:

class GracefulShutdown {
  private activeTasks: Map<string, Promise<void>> = new Map();
  private shutdownRequested = false;
  private drainTimeoutMs: number;

  constructor(drainTimeoutMs = 30_000) {
    this.drainTimeoutMs = drainTimeoutMs;
  }

  /**
   * Track a task. Returns a function to call when the task completes.
   */
  trackTask(taskId: string, promise: Promise<void>): void {
    this.activeTasks.set(taskId, promise);
    promise.finally(() => this.activeTasks.delete(taskId));
  }

  isShuttingDown(): boolean {
    return this.shutdownRequested;
  }

  /**
   * Begin graceful shutdown:
   * 1. Stop accepting new tasks
   * 2. Wait for active tasks to complete (up to drainTimeout)
   * 3. Send deregistration
   * 4. Exit
   */
  async shutdown(
    ws: WebSocket | null,
    config: { apiMomUrl: string; machineId: string; secret: string }
  ): Promise<void> {
    this.shutdownRequested = true;
    console.log(
      `[shutdown] Draining ${this.activeTasks.size} active tasks...`
    );

    // Wait for active tasks with timeout
    const drainPromise = Promise.all(this.activeTasks.values());
    const timeoutPromise = new Promise<void>((resolve) =>
      setTimeout(() => {
        console.warn("[shutdown] Drain timeout β€” forcing shutdown");
        resolve();
      }, this.drainTimeoutMs)
    );

    await Promise.race([drainPromise, timeoutPromise]);

    // Send final status
    if (ws?.readyState === WebSocket.OPEN) {
      ws.send(
        JSON.stringify({
          type: "status-report",
          machine: config.machineId,
          busy: false,
          version: "0.1.0",
        })
      );
      ws.close(1000, "shutdown");
    }

    // Deregister
    try {
      await fetch(
        `${config.apiMomUrl}/v1/machines/${config.machineId}`,
        {
          method: "DELETE",
          headers: { Authorization: `Bearer ${config.secret}` },
        }
      );
    } catch {
      // Best effort
    }

    console.log("[shutdown] Complete");
    process.exit(0);
  }
}

// Wire up
const shutdown = new GracefulShutdown(30_000);

process.on("SIGINT", () => shutdown.shutdown(ws, config));
process.on("SIGTERM", () => shutdown.shutdown(ws, config));

Ray Clusters

Ray is the most widely adopted framework for distributed AI workloads. A Ray cluster has a head node and worker nodes.

Architecture:

Machine registration:

ray start --head --port=6379

ray start --address="head-ip:6379" --num-cpus=8 --num-gpus=2 \
  --resources='{"custom_resource": 1}'

What Atlas can learn from Ray:

Dask Distributed

Dask is a parallel computing library with a central scheduler and distributed workers.

Key patterns:

dask scheduler

dask worker tcp://scheduler:8786 --resources "GPU=2" --nthreads 8

client = Client("tcp://scheduler:8786")
future = client.submit(gpu_task, data, resources={"GPU": 1})

What Atlas can learn from Dask:

vLLM Distributed Inference

vLLM distributes large language model inference across multiple GPUs and nodes.

Architecture:

What Atlas can learn from vLLM:

Ollama + Hive

Ollama is primarily a single-machine tool, but the Hive framework adds multi-machine coordination.

Hive Architecture:

What Atlas can learn from Hive:

Temporal.io

Temporal is a workflow orchestration platform where workers poll task queues.

Key patterns:

What Atlas can learn from Temporal:


Multi-Machine Coordination Systems

SystemDiscoveryWork DistributionStateTransportBest For
Atlas (Node/Hub/Chain)Hub registry + mDNSPush via WebSocketDurable ObjectsWebSocket + HTTPSAI agent orchestration, mixed cloud/local
RayGCS (head node)Scheduler pushDistributed object storegRPCML training, distributed inference
DaskScheduler registrationScheduler pushDistributed memoryTCPData science, array computing
TemporalTask queue subscriptionWorker pullPersistence layer (Cassandra/PG)gRPCDurable workflows, business logic
ConsulGossip + DNSService mesh routingRaft consensus KVHTTP/DNS/gRPCService discovery, config management
KubernetesAPI server + etcdScheduler placementetcdAPI serverContainer orchestration

Install Script Patterns

InstallerOS DetectionService ConfigAuth SetupBinary VerificationUninstall
k3suname + /etc/os-releasesystemd/openrc auto-detectK3S_TOKEN env varSHA256 checksumk3s-uninstall.sh
Tailscalecurl + package managerNative packages (apt/yum)tailscale login (OAuth)Package signaturesPackage manager
Homebrewxcode-select detectionN/A (user-space)N/AGit-based (source)brew uninstall
miseuname -s/-mN/A (user-space)N/AChecksum + GPGmise uninstall
Atlas (proposed)uname + init systemlaunchd/systemd/openrcBearer tokenSHA256 checksumβ€”uninstall flag

Capability Discovery Approaches

ApproachScopeLatencyFault ToleranceComplexityBest For
Central registry (Consul/etcd)GlobalSub-msDepends on registry HAMediumCross-datacenter services
Gossip (Serf/SWIM)ClusterO(log N) roundsVery high (decentralized)HighLarge clusters, partition-tolerant
mDNS/BonjourLANSub-secondHigh (no central point)LowLocal network, zero-config
K8s labels + schedulerClusterSub-msAPI server HAMediumContainer workloads
Hub-spoke (Atlas)GlobalSub-msHub is SPoFLowSimple topologies, AI agents
Tailscale ACLsGlobal meshSub-msCoordination server HALow (outsourced)Secure cross-network

Pipeline / Workflow Engines

EngineDefinitionRoutingCheckpointingFan-OutLanguage
Atlas ChainJSON declarativeCapability-basedDO storageManual (code)TypeScript
TemporalCode (SDK)Task queueBuilt-in (server)Built-in (child workflows)Multi-language
AirflowPython DAGExecutor poolTask instance stateBuilt-in (dynamic tasks)Python
PrefectPython flowWork poolBuilt-in (Prefect Cloud)Built-in (.map())Python
N8NVisual (JSON)Node connectionExecution tableBuilt-in (split node)TypeScript
Node-REDVisual (JSON)Wire connectionsNone (stateless)Built-in (split node)JavaScript

Don’tDo InsteadWhy
Hardcode machine addresses in pipeline definitionsUse capability names and let the hub routeMachines come and go. IP addresses change. Capabilities are the stable abstraction.
Use a single shared token for all machinesIssue per-machine tokens via registration flowOne leaked token compromises one machine, not the entire network.
Skip the heartbeat and rely on WebSocket connection stateHeartbeat independently of WebSocketWebSocket can appear connected (TCP half-open) while the machine is actually dead. Heartbeats confirm liveness at the application level.
Put secrets in the install script URLPass secrets via environment variables or stdincurl https://install.example.com?token=SECRET shows up in shell history, server logs, and proxy logs.
Restart failed pipelines from step 0Checkpoint after each step and resume from the last checkpointRe-executing expensive steps (GPU inference, large downloads) wastes time and money.
Register capabilities once at startupRe-evaluate capabilities at each heartbeatGPU VRAM fills up. Disk space runs out. Services crash. Capabilities are dynamic, not static.
Route all traffic through the hubUse hub for coordination but allow P2P for data transferThe hub becomes a bottleneck for large payloads. Machines on the same LAN should transfer data directly.
Reconnect immediately after disconnectExponential backoff with jitterThundering herd: all machines reconnect at once and crash the hub again.
Deploy updates to all machines at onceRolling update (10-20% at a time)If the update is broken, you lose your entire fleet instead of 10-20%.
Block the main thread during capability executionRun handlers in worker threads or asyncA long-running capability blocks heartbeats, causing the hub to think the machine is dead.
Use hostname as the machine IDGenerate a UUID and persist it to diskHostnames can change. Two machines can have the same hostname. UUIDs are globally unique and stable.
Skip the uninstall scriptAlways provide a clean uninstall pathUsers who can’t cleanly remove your software won’t install it in the first place.

Official Documentation

Libraries and Tools

Blog Posts and Guides

RFCs and Standards

Security


Edit page
Share this post on:

Previous Post
Multi-Agent AI Processing Pipelines on Cloudflare Workers
Next Post
Notification System Design Patterns