Tip: How to Build a Custom Entrypoint in Solace Agent Mesh

Tip: How to Build a Custom Entrypoint in Solace Agent Mesh

Solace Agent Mesh is an event-driven framework for building distributed ecosystems of collaborative AI agents, powered by the Solace Event Mesh for enterprise-grade agent orchestration. Solace Agent Mesh ships with entrypoints for the WebUI, REST API, Slack, Event Mesh, and more. But what happens when you need to connect your agents to a platform that does not have an entrypoint yet? You build your own.

Solace Agent Mesh provides a framework for building custom entrypoints that handles all the hard parts: A2A protocol mechanics, authentication flows, artifact management, and event mesh communication. You focus on one thing: translating between your platform and the agent mesh.

In this post, we will walk through both approaches to building a custom entrypoint, the key APIs and types you will work with, and practical patterns drawn from real-world implementations.


Two Approaches

Solace Agent Mesh offers two patterns for building custom entrypoints:

Gateway Adapter (Recommended)

You write a Python class that extends GatewayAdapter. The framework handles all A2A protocol complexity. You implement one required method (prepare_task()) and optionally override response handlers. This is the approach used by the Slack and MCP Server entrypoint plugins.

Full Custom Gateway (Advanced)

You extend BaseGatewayApp and BaseGatewayComponent directly. You handle A2A protocol details, task context management, and message routing yourself. Use this only when you need specialized authentication flows, custom A2A protocol behavior, or complex multi-stage processing pipelines.

This post focuses on the Gateway Adapter pattern, which covers the vast majority of use cases.


Getting Started

Option 1: Create as a Plugin (Recommended)

For reusable adapters that you plan to share or use across multiple projects, use the CLI:

solace-agent-mesh plugin create my-gateway-plugin

Select “Gateway Plugin” when prompted. This generates a project structure with BaseGatewayApp and BaseGatewayComponent subclasses (the full custom pattern).

To use the adapter pattern instead, you’ll need to:

  1. Replace the generated app.py and component.py with your GatewayAdapter subclass
  2. Update the config to use solace_agent_mesh.gateway.generic.app as the app_module
  3. Reference your adapter class in gateway_adapter

Generated structure:

my-gateway-plugin/
  pyproject.toml              # Package metadata and build config
  config.yaml                 # Gateway configuration template
  README.md
  src/my_gateway_plugin/
    __init__.py
    app.py                    # App class (replace for adapter pattern)
    component.py              # Component class (replace for adapter pattern)

Option 2: Add to Existing Project

For project-specific adapters, create your adapter class directly in your project:

# my_project/gateways/my_adapter.py
from solace_agent_mesh.gateway.adapter.base import GatewayAdapter
from solace_agent_mesh.gateway.adapter.types import (
    AuthClaims,
    GatewayContext,
    ResponseContext,
    SamTask,
    SamUpdate
)

class MyAdapter(GatewayAdapter):
    async def init(self, context: GatewayContext) -> None:
        self.context = context
        # Initialize your platform connection

    async def prepare_task(self, external_input, endpoint_context=None) -> SamTask:
        # Convert platform event to SamTask
        return SamTask(
            parts=[self.context.create_text_part(external_input["message"])],
            target_agent="OrchestratorAgent",
            session_id=f"my-platform-{external_input['conversation_id']}",
        )

Configuration

For the adapter pattern, your configuration YAML should use the generic app module and point to your adapter class:

apps:
  - name: my_entrypoint_app
    app_base_path: .
    app_module: solace_agent_mesh.gateway.generic.app
    broker:
      <<: *broker_connection

    app_config:
      namespace: "${NAMESPACE}"
      gateway_id: my-entrypoint-01
      gateway_adapter: my_entrypoint.component.MyAdapter
      adapter_config:
        api_token: ${MY_PLATFORM_TOKEN}
        webhook_port: ${WEBHOOK_PORT, 9090}
      default_agent_name: OrchestratorAgent
      artifact_service:
        type: "filesystem"
        base_path: "/tmp/samv2"
        artifact_scope: namespace
      system_purpose: >
        The system is an AI assistant...

Key points:

  • app_base_path is required and typically set to .
  • app_module is always solace_agent_mesh.gateway.generic.app for the adapter pattern
  • gateway_adapter is the Python import path to your adapter class
  • adapter_config is passed to your adapter and validated against your Pydantic ConfigModel if defined
  • If gateway_id is not provided, it defaults to the app name

The Gateway Adapter API

The GatewayAdapter base class defines the contract between your platform-specific code and the framework. You implement the methods that matter for your platform and let the framework handle the rest.

The One Required Method: prepare_task()

This is the only method you must implement. It converts an incoming platform event into a SamTask—the standardized format that agents understand. This is where you extract the user’s message, attach any files, set the target agent, and store platform-specific routing data.

from solace_agent_mesh.gateway.adapter.base import GatewayAdapter
from solace_agent_mesh.gateway.adapter.types import (
    SamTask, AuthClaims, SamUpdate, SamError, ResponseContext, GatewayContext
)

class MyAdapter(GatewayAdapter):
    async def prepare_task(self, external_input, endpoint_context=None) -> SamTask:
        return SamTask(
            parts=[self.context.create_text_part(external_input["message"])],
            target_agent="OrchestratorAgent",
            session_id=f"my-platform-{external_input['conversation_id']}",
        )

That is a working adapter. It receives input from your platform, creates a text part, and routes it to an agent. The framework handles everything else.

Lifecycle Methods

Control your adapter’s startup and shutdown behavior. Use init() to establish connections to your platform (WebSocket clients, HTTP servers, SDK instances) and cleanup() to gracefully release those resources when the entrypoint shuts down.

async def init(self, context: GatewayContext) -> None:
    """
    Initialize the gateway adapter.

    This is where you should:
    - Store the context for later use
    - Start platform listeners (WebSocket, HTTP server, etc.)
    - Connect to external services
    """
    self.context = context
    # Start your webhook server, SDK client, polling loop, etc.
    await self.start_platform_listener()

async def cleanup(self) -> None:
    """
    Clean up resources on shutdown.

    This is where you should:
    - Stop platform listeners
    - Close connections
    - Release resources
    """
    await self.stop_platform_listener()

Authentication

Extract user identity from platform events before the request is processed. The framework uses this information for access control, audit logging, and user-scoped artifact storage. Return None to fall back to a default identity from the config—useful for service accounts or anonymous access.

async def extract_auth_claims(self, external_input, endpoint_context=None):
    """
    Extract user identity from the platform event.

    Return AuthClaims with user info, or None to use config-based auth
    (falls back to default_user_identity from the config).
    """
    return AuthClaims(
        id=external_input["user_email"],
        email=external_input["user_email"],
        source="my_platform",
    )

If you return None, the framework falls back to default_user_identity from the config.

Handling Responses

Deliver agent responses back to your platform. The framework calls these methods as the agent streams its response. Override the handlers relevant to your platform—text for chat interfaces, files for document-heavy workflows, status updates for progress indicators.

The base handle_update() method dispatches to individual part handlers. Override at either level depending on your needs:

async def handle_update(self, update: SamUpdate, context: ResponseContext) -> None:
    """
    Handle an update from the agent (batch handler).

    By default, this dispatches to individual part handlers.
    Override for custom batch processing—useful when you need to
    process all parts together (e.g., combining text and files
    into a single platform message).
    """
    # Default implementation iterates over parts and calls individual handlers

async def handle_text_chunk(self, text: str, context: ResponseContext) -> None:
    """
    Called for each text chunk during streaming.
    
    Use for real-time typing indicators or incremental message updates.
    Default is no-op.
    """

async def handle_file(self, file_part: SamFilePart, context: ResponseContext) -> None:
    """
    Called when the agent produces a file (report, image, data export).
    
    The file may contain inline bytes or a URI pointing to an artifact.
    Default is no-op.
    """

async def handle_data_part(self, data_part: SamDataPart, context: ResponseContext) -> None:
    """
    Called for structured data parts that are not progress updates.
    
    Use for custom metadata, tool results, or platform-specific payloads.
    Default is no-op.
    """

async def handle_status_update(self, status_text: str, context: ResponseContext) -> None:
    """
    Called when the agent reports progress (e.g., 'Searching...', 'Analyzing document...').
    
    Triggered by data parts with type 'agent_progress_update'.
    Use for progress indicators or status messages. Default is no-op.
    """

async def handle_task_complete(self, context: ResponseContext) -> None:
    """
    Called when the agent finishes processing. Always called, even on error.
    
    Use for cleanup, sending final confirmations, or removing typing indicators.
    Default is no-op.
    """

async def handle_error(self, error: SamError, context: ResponseContext) -> None:
    """
    Called when an error occurs (agent failure, timeout, cancellation).
    
    Check error.category to determine the type: 'FAILED', 'CANCELED',
    'TIMED_OUT', 'PROTOCOL_ERROR', or 'GATEWAY_ERROR'. Default is no-op.
    """

Note: The default handle_update() implementation dispatches to individual handlers automatically. It checks for agent_progress_update data parts and routes them to handle_status_update(). The individual handlers (handle_text_chunk, handle_file, etc.) are no-ops by default, so you must override the ones relevant to your platform.

Agent Registry

React to agents joining or leaving the mesh. These callbacks enable dynamic behavior—like the MCP Server entrypoint, which registers new MCP tools when agents appear and removes them when agents disconnect.

async def handle_agent_registered(self, agent_card: "AgentCard") -> None:
    """
    Called when a new agent joins the mesh.
    
    The AgentCard contains the agent's name, description, skills,
    and supported input/output modes. Use this to update dynamic
    UIs or register new capabilities.
    """

async def handle_agent_deregistered(self, agent_name: str) -> None:
    """
    Called when an agent leaves the mesh (shutdown or TTL expiry).
    
    Use this to remove the agent from menus, disable related features,
    or clean up cached data.
    """

The MCP Server entrypoint uses these to dynamically register and deregister MCP tools as agents come and go.


Types

SamTask (your input to the mesh)

class SamTask(BaseModel):
    parts: List[SamContentPart]          # Text, file, or data parts
    target_agent: str                     # Which agent to route to (required)
    session_id: Optional[str] = None      # Conversation scope
    is_streaming: bool = True             # Enable streaming responses
    platform_context: Dict[str, Any] = {} # Opaque data carried to response handlers
    session_behavior: Optional[str] = None # 'PERSISTENT' or 'RUN_BASED', overrides agent default

Note: target_agent is a required field.

Content Parts

# Text
context.create_text_part("Hello, agent!")

# File from bytes
context.create_file_part_from_bytes(
    name="document.pdf",
    content_bytes=file_bytes,
    mime_type="application/pdf",
)

# File from artifact URI
context.create_file_part_from_uri(
    uri="artifact://gateway/user/session/file.csv",
    name="file.csv",
)

# Structured data
context.create_data_part({"key": "value", "count": 42})

AuthClaims

class AuthClaims(BaseModel):
    id: Optional[str] = None        # Primary user identifier
    email: Optional[str] = None
    token: Optional[str] = None     # Bearer token or API key
    token_type: Optional[Literal["bearer", "api_key"]] = None
    source: str = "platform"
    raw_context: Dict[str, Any] = {}  # Additional platform-specific context

SamError

class SamError(BaseModel):
    message: str
    code: int
    category: Literal["FAILED", "CANCELED", "TIMED_OUT", "PROTOCOL_ERROR", "GATEWAY_ERROR"]

Check error.category to differentiate between cancellation and actual errors.

ResponseContext

class ResponseContext(BaseModel):
    task_id: str
    session_id: Optional[str]
    user_id: str
    platform_context: Dict[str, Any]  # The same dict you set in SamTask

The platform_context carries opaque data from prepare_task() through to all response handlers. Use it to store channel IDs, thread references, or anything your platform needs to route the response back.


GatewayContext: What the Framework Gives You

The GatewayContext object is passed to your adapter during init(). It provides access to framework services that handle common entrypoint needs—so you don’t have to build them yourself.

Task Submission

Submit incoming platform events to the agent mesh. This is the main entry point that orchestrates the full request lifecycle: it calls your extract_auth_claims() and prepare_task() methods, converts parts to A2A format, and publishes the task to the mesh.

task_id = await self.context.handle_external_input(external_input, endpoint_context)

The returned task_id uniquely identifies this request and can be used for cancellation or state tracking.

Task Cancellation

Allow users to abort long-running requests. When cancelled, the agent stops processing and your handle_error() receives a SamError with category="CANCELED".

await self.context.cancel_task(task_id)

Useful for implementing “Stop generating” buttons or handling user disconnects.

Artifact Management

Access files created by agents (reports, images, data exports) or uploaded by users. Artifacts are scoped to sessions and persist across requests.

# Download an artifact by filename (version defaults to "latest")
content_bytes = await self.context.load_artifact_content(
    context=response_context,
    filename="report.csv",
    version="latest"
)

# List all artifacts available in the user's session
artifacts = await self.context.list_artifacts(response_context)

Use this when SamFilePart contains a URI instead of inline bytes—the framework stores large files as artifacts to avoid message size limits.

Agent Discovery

Query which agents are currently available in the mesh. Each agent publishes an AgentCard describing its capabilities, skills, and supported input/output modes.

agents = self.context.list_agents()  # Returns list of AgentCards

Useful for building dynamic UIs that show available agents, or for MCP-style entrypoints that expose agent skills as tools.

Feedback Collection

Capture user ratings on agent responses. Feedback is stored and can be used for analytics, model fine-tuning, or quality monitoring.

from solace_agent_mesh.gateway.adapter.types import SamFeedback

await self.context.submit_feedback(SamFeedback(
    task_id=task_id,
    session_id=session_id,
    rating="up",  # or "down"
    comment="Helpful response",
    user_id=user_id,
))

The Slack adapter uses this to implement thumbs-up/thumbs-down buttons on responses.

State Management

Store temporary data that persists across multiple events within a task or session. The framework handles expiration automatically.

# Task-level state (expires after 1 hour)
# Use for data tied to a single request: message IDs, progress indicators
self.context.set_task_state(task_id, "status_message_id", msg_id)
status_id = self.context.get_task_state(task_id, "status_message_id")

# Session-level state (expires after 24 hours)
# Use for data that spans requests: user preferences, conversation metadata
self.context.set_session_state(session_id, "preferences", {"language": "en"})
prefs = self.context.get_session_state(session_id, "preferences")

This eliminates the need to manage your own cache or database for short-lived entrypoint state.

Timers

Schedule delayed or recurring callbacks. Useful for implementing timeouts, periodic status updates, or debouncing rapid events.

# One-shot timer: execute callback after 5 seconds
timer_id = self.context.add_timer(delay_ms=5000, callback=my_callback)

# Repeating timer: execute immediately, then every 30 seconds
timer_id = self.context.add_timer(delay_ms=0, callback=heartbeat, interval_ms=30000)

# Cancel a scheduled timer
self.context.cancel_timer(timer_id)

The Slack adapter uses timers to batch rapid streaming updates into periodic message edits, avoiding rate limits.


Config Validation with Pydantic

Define a ConfigModel on your adapter to validate adapter_config from the YAML:

from pydantic import BaseModel, Field

class MyPlatformConfig(BaseModel):
    api_token: str = Field(..., description="Platform API token")
    webhook_port: int = Field(default=9090, description="Port for incoming webhooks")
    max_message_length: int = Field(default=4000, description="Max response length")

class MyAdapter(GatewayAdapter):
    ConfigModel = MyPlatformConfig

    async def init(self, context: GatewayContext) -> None:
        self.context = context
        config = context.adapter_config  # Validated config
        self.token = config.api_token
        self.port = config.webhook_port

If the YAML config does not match your schema, the entrypoint fails at startup with a clear validation error.


Practical Patterns

Streaming with Message Queuing

For platforms that need ordered message delivery (like Slack), use an async queue to serialize updates:

import asyncio

class MyAdapter(GatewayAdapter):
    def __init__(self):
        self.message_queues = {}

    async def handle_text_chunk(self, text: str, context: ResponseContext) -> None:
        queue = self.message_queues.get(context.task_id)
        if queue:
            await queue.put(("text", text))

    async def handle_task_complete(self, context: ResponseContext) -> None:
        queue = self.message_queues.pop(context.task_id, None)
        if queue:
            await queue.put(None)  # Sentinel to stop the consumer

File Handling

async def prepare_task(self, external_input, endpoint_context=None) -> SamTask:
    parts = [self.context.create_text_part(external_input["message"])]

    for file in external_input.get("attachments", []):
        parts.append(self.context.create_file_part_from_bytes(
            name=file["filename"],
            content_bytes=file["content"],
            mime_type=file.get("mime_type", "application/octet-stream"),
        ))

    return SamTask(parts=parts, target_agent="OrchestratorAgent")

async def handle_file(self, file_part: SamFilePart, context: ResponseContext) -> None:
    if file_part.content_bytes:
        await self.upload_to_platform(file_part.name, file_part.content_bytes)
    elif file_part.uri:
        content = await self.context.load_artifact_content(
            context=context,
            filename=file_part.name
        )
        if content:
            await self.upload_to_platform(file_part.name, content)

Error Handling

async def handle_error(self, error: SamError, context: ResponseContext) -> None:
    channel_id = context.platform_context.get("channel_id")

    if error.category == "CANCELED":
        await self.send_to_platform(channel_id, "Task was canceled.")
    elif error.category == "TIMED_OUT":
        await self.send_to_platform(channel_id, "Task timed out. Try again.")
    else:
        await self.send_to_platform(channel_id, f"Error: {error.message}")

Using platform_context for Routing

async def prepare_task(self, external_input, endpoint_context=None) -> SamTask:
    return SamTask(
        parts=[self.context.create_text_part(external_input["text"])],
        target_agent="OrchestratorAgent",
        session_id=f"my-platform-{external_input['thread_id']}",
        platform_context={
            "channel_id": external_input["channel_id"],
            "thread_id": external_input["thread_id"],
            "reply_to_message_id": external_input["message_id"],
        },
    )

async def handle_text_chunk(self, text: str, context: ResponseContext) -> None:
    channel = context.platform_context["channel_id"]
    thread = context.platform_context["thread_id"]
    await self.platform_client.send_message(channel, thread, text)

The Full Custom Gateway (Advanced)

If the adapter pattern does not meet your needs, you can create a full custom gateway.

Create a Gateway from Scratch

Use the CLI to generate a template:

sam add gateway my-interface

This creates a Python gateway template file which you can customize. The generated component extends BaseGatewayComponent directly and implements all protocol handling manually.

When to Use Full Custom Gateways

Consider a full custom gateway only if you need:

  • Highly specialized authentication flows not supported by the standard flow
  • Custom A2A protocol behavior or extensions
  • Complex multi-stage processing pipelines with custom state management
  • Fine-grained control over every aspect of the gateway lifecycle

Full custom gateways require significantly more code and expertise but provide complete control over the gateway behavior.

For more information, see Creating Custom Gateways.


Building and Distributing

Prerequisites

Ensure the Python build package is installed:

pip install build

Build Your Plugin

Once your adapter is working:

cd my-gateway-plugin
solace-agent-mesh plugin build

This creates a distributable package in the dist/ directory.

Distribution Options

Others can add your plugin in several ways:

From a local package:

solace-agent-mesh plugin add my-custom-entrypoint --plugin ./dist/my_gateway_plugin-0.1.0.tar.gz

From PyPI (if published):

solace-agent-mesh plugin add my-custom-entrypoint --plugin my-gateway-plugin

From a Git repository:

solace-agent-mesh plugin add my-custom-entrypoint --plugin git+https://github.com/username/my-gateway-plugin

Where:

  • my-custom-entrypoint is the name you choose for the component instance
  • The --plugin value can be a PyPI package name, a local path, or a Git URL

Last Words of Wisdom

  1. Start with prepare_task() only. Get a message flowing end-to-end before adding streaming, files, or error handling. The framework provides sensible defaults for everything else.

  2. Use platform_context for routing. Store channel IDs, thread references, or reply-to data in platform_context during prepare_task(). It carries through to all response handlers without any extra plumbing.

  3. The adapter never touches A2A. You work with SamTask, SamUpdate, SamTextPart, and SamFilePart. The framework translates to and from the A2A protocol.

  4. Validate your config with Pydantic. Define a ConfigModel so misconfigurations fail at startup, not at runtime when a request arrives.

  5. Use state management for multi-step interactions. Task state (1 hour TTL) is great for tracking in-progress message IDs. Session state (24 hour TTL) works for user preferences or conversation metadata.

  6. Override the handlers you need. The individual handlers (handle_text_chunk, handle_file, etc.) are no-ops by default. The base handle_update() dispatches to them automatically.

  7. Look at existing adapter implementations for reference. The Slack adapter demonstrates streaming with message queuing, file uploads, citation resolution, feedback buttons, and error handling. Check the solace-agent-mesh-core-plugins repository for complete examples.

  8. Test by mocking GatewayContext. Use MagicMock(spec=GatewayContext) and set adapter_config to a real Pydantic instance. Test prepare_task() and extract_auth_claims() as independent async functions.


Learn More


For more information about Solace Agent Mesh, visit the repository:

An event-driven framework designed to build and orchestrate multi-agent AI systems. It enables seamless integration of AI agents with real-world data sources and systems, facilitating complex, multi-step workflows.