Tip: How to Build a Custom Entrypoint in Solace Agent Mesh
Solace Agent Mesh is an event-driven framework for building distributed ecosystems of collaborative AI agents, powered by the Solace Event Mesh for enterprise-grade agent orchestration. Solace Agent Mesh ships with entrypoints for the WebUI, REST API, Slack, Event Mesh, and more. But what happens when you need to connect your agents to a platform that does not have an entrypoint yet? You build your own.
Solace Agent Mesh provides a framework for building custom entrypoints that handles all the hard parts: A2A protocol mechanics, authentication flows, artifact management, and event mesh communication. You focus on one thing: translating between your platform and the agent mesh.
In this post, we will walk through both approaches to building a custom entrypoint, the key APIs and types you will work with, and practical patterns drawn from real-world implementations.
Two Approaches
Solace Agent Mesh offers two patterns for building custom entrypoints:
Gateway Adapter (Recommended)
You write a Python class that extends GatewayAdapter. The framework handles all A2A protocol complexity. You implement one required method (prepare_task()) and optionally override response handlers. This is the approach used by the Slack and MCP Server entrypoint plugins.
Full Custom Gateway (Advanced)
You extend BaseGatewayApp and BaseGatewayComponent directly. You handle A2A protocol details, task context management, and message routing yourself. Use this only when you need specialized authentication flows, custom A2A protocol behavior, or complex multi-stage processing pipelines.
This post focuses on the Gateway Adapter pattern, which covers the vast majority of use cases.
Getting Started
Option 1: Create as a Plugin (Recommended)
For reusable adapters that you plan to share or use across multiple projects, use the CLI:
solace-agent-mesh plugin create my-gateway-plugin
Select “Gateway Plugin” when prompted. This generates a project structure with BaseGatewayApp and BaseGatewayComponent subclasses (the full custom pattern).
To use the adapter pattern instead, you’ll need to:
- Replace the generated
app.pyandcomponent.pywith yourGatewayAdaptersubclass - Update the config to use
solace_agent_mesh.gateway.generic.appas theapp_module - Reference your adapter class in
gateway_adapter
Generated structure:
my-gateway-plugin/
pyproject.toml # Package metadata and build config
config.yaml # Gateway configuration template
README.md
src/my_gateway_plugin/
__init__.py
app.py # App class (replace for adapter pattern)
component.py # Component class (replace for adapter pattern)
Option 2: Add to Existing Project
For project-specific adapters, create your adapter class directly in your project:
# my_project/gateways/my_adapter.py
from solace_agent_mesh.gateway.adapter.base import GatewayAdapter
from solace_agent_mesh.gateway.adapter.types import (
AuthClaims,
GatewayContext,
ResponseContext,
SamTask,
SamUpdate
)
class MyAdapter(GatewayAdapter):
async def init(self, context: GatewayContext) -> None:
self.context = context
# Initialize your platform connection
async def prepare_task(self, external_input, endpoint_context=None) -> SamTask:
# Convert platform event to SamTask
return SamTask(
parts=[self.context.create_text_part(external_input["message"])],
target_agent="OrchestratorAgent",
session_id=f"my-platform-{external_input['conversation_id']}",
)
Configuration
For the adapter pattern, your configuration YAML should use the generic app module and point to your adapter class:
apps:
- name: my_entrypoint_app
app_base_path: .
app_module: solace_agent_mesh.gateway.generic.app
broker:
<<: *broker_connection
app_config:
namespace: "${NAMESPACE}"
gateway_id: my-entrypoint-01
gateway_adapter: my_entrypoint.component.MyAdapter
adapter_config:
api_token: ${MY_PLATFORM_TOKEN}
webhook_port: ${WEBHOOK_PORT, 9090}
default_agent_name: OrchestratorAgent
artifact_service:
type: "filesystem"
base_path: "/tmp/samv2"
artifact_scope: namespace
system_purpose: >
The system is an AI assistant...
Key points:
app_base_pathis required and typically set to.app_moduleis alwayssolace_agent_mesh.gateway.generic.appfor the adapter patterngateway_adapteris the Python import path to your adapter classadapter_configis passed to your adapter and validated against your PydanticConfigModelif defined- If
gateway_idis not provided, it defaults to the app name
The Gateway Adapter API
The GatewayAdapter base class defines the contract between your platform-specific code and the framework. You implement the methods that matter for your platform and let the framework handle the rest.
The One Required Method: prepare_task()
This is the only method you must implement. It converts an incoming platform event into a SamTask—the standardized format that agents understand. This is where you extract the user’s message, attach any files, set the target agent, and store platform-specific routing data.
from solace_agent_mesh.gateway.adapter.base import GatewayAdapter
from solace_agent_mesh.gateway.adapter.types import (
SamTask, AuthClaims, SamUpdate, SamError, ResponseContext, GatewayContext
)
class MyAdapter(GatewayAdapter):
async def prepare_task(self, external_input, endpoint_context=None) -> SamTask:
return SamTask(
parts=[self.context.create_text_part(external_input["message"])],
target_agent="OrchestratorAgent",
session_id=f"my-platform-{external_input['conversation_id']}",
)
That is a working adapter. It receives input from your platform, creates a text part, and routes it to an agent. The framework handles everything else.
Lifecycle Methods
Control your adapter’s startup and shutdown behavior. Use init() to establish connections to your platform (WebSocket clients, HTTP servers, SDK instances) and cleanup() to gracefully release those resources when the entrypoint shuts down.
async def init(self, context: GatewayContext) -> None:
"""
Initialize the gateway adapter.
This is where you should:
- Store the context for later use
- Start platform listeners (WebSocket, HTTP server, etc.)
- Connect to external services
"""
self.context = context
# Start your webhook server, SDK client, polling loop, etc.
await self.start_platform_listener()
async def cleanup(self) -> None:
"""
Clean up resources on shutdown.
This is where you should:
- Stop platform listeners
- Close connections
- Release resources
"""
await self.stop_platform_listener()
Authentication
Extract user identity from platform events before the request is processed. The framework uses this information for access control, audit logging, and user-scoped artifact storage. Return None to fall back to a default identity from the config—useful for service accounts or anonymous access.
async def extract_auth_claims(self, external_input, endpoint_context=None):
"""
Extract user identity from the platform event.
Return AuthClaims with user info, or None to use config-based auth
(falls back to default_user_identity from the config).
"""
return AuthClaims(
id=external_input["user_email"],
email=external_input["user_email"],
source="my_platform",
)
If you return None, the framework falls back to default_user_identity from the config.
Handling Responses
Deliver agent responses back to your platform. The framework calls these methods as the agent streams its response. Override the handlers relevant to your platform—text for chat interfaces, files for document-heavy workflows, status updates for progress indicators.
The base handle_update() method dispatches to individual part handlers. Override at either level depending on your needs:
async def handle_update(self, update: SamUpdate, context: ResponseContext) -> None:
"""
Handle an update from the agent (batch handler).
By default, this dispatches to individual part handlers.
Override for custom batch processing—useful when you need to
process all parts together (e.g., combining text and files
into a single platform message).
"""
# Default implementation iterates over parts and calls individual handlers
async def handle_text_chunk(self, text: str, context: ResponseContext) -> None:
"""
Called for each text chunk during streaming.
Use for real-time typing indicators or incremental message updates.
Default is no-op.
"""
async def handle_file(self, file_part: SamFilePart, context: ResponseContext) -> None:
"""
Called when the agent produces a file (report, image, data export).
The file may contain inline bytes or a URI pointing to an artifact.
Default is no-op.
"""
async def handle_data_part(self, data_part: SamDataPart, context: ResponseContext) -> None:
"""
Called for structured data parts that are not progress updates.
Use for custom metadata, tool results, or platform-specific payloads.
Default is no-op.
"""
async def handle_status_update(self, status_text: str, context: ResponseContext) -> None:
"""
Called when the agent reports progress (e.g., 'Searching...', 'Analyzing document...').
Triggered by data parts with type 'agent_progress_update'.
Use for progress indicators or status messages. Default is no-op.
"""
async def handle_task_complete(self, context: ResponseContext) -> None:
"""
Called when the agent finishes processing. Always called, even on error.
Use for cleanup, sending final confirmations, or removing typing indicators.
Default is no-op.
"""
async def handle_error(self, error: SamError, context: ResponseContext) -> None:
"""
Called when an error occurs (agent failure, timeout, cancellation).
Check error.category to determine the type: 'FAILED', 'CANCELED',
'TIMED_OUT', 'PROTOCOL_ERROR', or 'GATEWAY_ERROR'. Default is no-op.
"""
Note: The default handle_update() implementation dispatches to individual handlers automatically. It checks for agent_progress_update data parts and routes them to handle_status_update(). The individual handlers (handle_text_chunk, handle_file, etc.) are no-ops by default, so you must override the ones relevant to your platform.
Agent Registry
React to agents joining or leaving the mesh. These callbacks enable dynamic behavior—like the MCP Server entrypoint, which registers new MCP tools when agents appear and removes them when agents disconnect.
async def handle_agent_registered(self, agent_card: "AgentCard") -> None:
"""
Called when a new agent joins the mesh.
The AgentCard contains the agent's name, description, skills,
and supported input/output modes. Use this to update dynamic
UIs or register new capabilities.
"""
async def handle_agent_deregistered(self, agent_name: str) -> None:
"""
Called when an agent leaves the mesh (shutdown or TTL expiry).
Use this to remove the agent from menus, disable related features,
or clean up cached data.
"""
The MCP Server entrypoint uses these to dynamically register and deregister MCP tools as agents come and go.
Types
SamTask (your input to the mesh)
class SamTask(BaseModel):
parts: List[SamContentPart] # Text, file, or data parts
target_agent: str # Which agent to route to (required)
session_id: Optional[str] = None # Conversation scope
is_streaming: bool = True # Enable streaming responses
platform_context: Dict[str, Any] = {} # Opaque data carried to response handlers
session_behavior: Optional[str] = None # 'PERSISTENT' or 'RUN_BASED', overrides agent default
Note: target_agent is a required field.
Content Parts
# Text
context.create_text_part("Hello, agent!")
# File from bytes
context.create_file_part_from_bytes(
name="document.pdf",
content_bytes=file_bytes,
mime_type="application/pdf",
)
# File from artifact URI
context.create_file_part_from_uri(
uri="artifact://gateway/user/session/file.csv",
name="file.csv",
)
# Structured data
context.create_data_part({"key": "value", "count": 42})
AuthClaims
class AuthClaims(BaseModel):
id: Optional[str] = None # Primary user identifier
email: Optional[str] = None
token: Optional[str] = None # Bearer token or API key
token_type: Optional[Literal["bearer", "api_key"]] = None
source: str = "platform"
raw_context: Dict[str, Any] = {} # Additional platform-specific context
SamError
class SamError(BaseModel):
message: str
code: int
category: Literal["FAILED", "CANCELED", "TIMED_OUT", "PROTOCOL_ERROR", "GATEWAY_ERROR"]
Check error.category to differentiate between cancellation and actual errors.
ResponseContext
class ResponseContext(BaseModel):
task_id: str
session_id: Optional[str]
user_id: str
platform_context: Dict[str, Any] # The same dict you set in SamTask
The platform_context carries opaque data from prepare_task() through to all response handlers. Use it to store channel IDs, thread references, or anything your platform needs to route the response back.
GatewayContext: What the Framework Gives You
The GatewayContext object is passed to your adapter during init(). It provides access to framework services that handle common entrypoint needs—so you don’t have to build them yourself.
Task Submission
Submit incoming platform events to the agent mesh. This is the main entry point that orchestrates the full request lifecycle: it calls your extract_auth_claims() and prepare_task() methods, converts parts to A2A format, and publishes the task to the mesh.
task_id = await self.context.handle_external_input(external_input, endpoint_context)
The returned task_id uniquely identifies this request and can be used for cancellation or state tracking.
Task Cancellation
Allow users to abort long-running requests. When cancelled, the agent stops processing and your handle_error() receives a SamError with category="CANCELED".
await self.context.cancel_task(task_id)
Useful for implementing “Stop generating” buttons or handling user disconnects.
Artifact Management
Access files created by agents (reports, images, data exports) or uploaded by users. Artifacts are scoped to sessions and persist across requests.
# Download an artifact by filename (version defaults to "latest")
content_bytes = await self.context.load_artifact_content(
context=response_context,
filename="report.csv",
version="latest"
)
# List all artifacts available in the user's session
artifacts = await self.context.list_artifacts(response_context)
Use this when SamFilePart contains a URI instead of inline bytes—the framework stores large files as artifacts to avoid message size limits.
Agent Discovery
Query which agents are currently available in the mesh. Each agent publishes an AgentCard describing its capabilities, skills, and supported input/output modes.
agents = self.context.list_agents() # Returns list of AgentCards
Useful for building dynamic UIs that show available agents, or for MCP-style entrypoints that expose agent skills as tools.
Feedback Collection
Capture user ratings on agent responses. Feedback is stored and can be used for analytics, model fine-tuning, or quality monitoring.
from solace_agent_mesh.gateway.adapter.types import SamFeedback
await self.context.submit_feedback(SamFeedback(
task_id=task_id,
session_id=session_id,
rating="up", # or "down"
comment="Helpful response",
user_id=user_id,
))
The Slack adapter uses this to implement thumbs-up/thumbs-down buttons on responses.
State Management
Store temporary data that persists across multiple events within a task or session. The framework handles expiration automatically.
# Task-level state (expires after 1 hour)
# Use for data tied to a single request: message IDs, progress indicators
self.context.set_task_state(task_id, "status_message_id", msg_id)
status_id = self.context.get_task_state(task_id, "status_message_id")
# Session-level state (expires after 24 hours)
# Use for data that spans requests: user preferences, conversation metadata
self.context.set_session_state(session_id, "preferences", {"language": "en"})
prefs = self.context.get_session_state(session_id, "preferences")
This eliminates the need to manage your own cache or database for short-lived entrypoint state.
Timers
Schedule delayed or recurring callbacks. Useful for implementing timeouts, periodic status updates, or debouncing rapid events.
# One-shot timer: execute callback after 5 seconds
timer_id = self.context.add_timer(delay_ms=5000, callback=my_callback)
# Repeating timer: execute immediately, then every 30 seconds
timer_id = self.context.add_timer(delay_ms=0, callback=heartbeat, interval_ms=30000)
# Cancel a scheduled timer
self.context.cancel_timer(timer_id)
The Slack adapter uses timers to batch rapid streaming updates into periodic message edits, avoiding rate limits.
Config Validation with Pydantic
Define a ConfigModel on your adapter to validate adapter_config from the YAML:
from pydantic import BaseModel, Field
class MyPlatformConfig(BaseModel):
api_token: str = Field(..., description="Platform API token")
webhook_port: int = Field(default=9090, description="Port for incoming webhooks")
max_message_length: int = Field(default=4000, description="Max response length")
class MyAdapter(GatewayAdapter):
ConfigModel = MyPlatformConfig
async def init(self, context: GatewayContext) -> None:
self.context = context
config = context.adapter_config # Validated config
self.token = config.api_token
self.port = config.webhook_port
If the YAML config does not match your schema, the entrypoint fails at startup with a clear validation error.
Practical Patterns
Streaming with Message Queuing
For platforms that need ordered message delivery (like Slack), use an async queue to serialize updates:
import asyncio
class MyAdapter(GatewayAdapter):
def __init__(self):
self.message_queues = {}
async def handle_text_chunk(self, text: str, context: ResponseContext) -> None:
queue = self.message_queues.get(context.task_id)
if queue:
await queue.put(("text", text))
async def handle_task_complete(self, context: ResponseContext) -> None:
queue = self.message_queues.pop(context.task_id, None)
if queue:
await queue.put(None) # Sentinel to stop the consumer
File Handling
async def prepare_task(self, external_input, endpoint_context=None) -> SamTask:
parts = [self.context.create_text_part(external_input["message"])]
for file in external_input.get("attachments", []):
parts.append(self.context.create_file_part_from_bytes(
name=file["filename"],
content_bytes=file["content"],
mime_type=file.get("mime_type", "application/octet-stream"),
))
return SamTask(parts=parts, target_agent="OrchestratorAgent")
async def handle_file(self, file_part: SamFilePart, context: ResponseContext) -> None:
if file_part.content_bytes:
await self.upload_to_platform(file_part.name, file_part.content_bytes)
elif file_part.uri:
content = await self.context.load_artifact_content(
context=context,
filename=file_part.name
)
if content:
await self.upload_to_platform(file_part.name, content)
Error Handling
async def handle_error(self, error: SamError, context: ResponseContext) -> None:
channel_id = context.platform_context.get("channel_id")
if error.category == "CANCELED":
await self.send_to_platform(channel_id, "Task was canceled.")
elif error.category == "TIMED_OUT":
await self.send_to_platform(channel_id, "Task timed out. Try again.")
else:
await self.send_to_platform(channel_id, f"Error: {error.message}")
Using platform_context for Routing
async def prepare_task(self, external_input, endpoint_context=None) -> SamTask:
return SamTask(
parts=[self.context.create_text_part(external_input["text"])],
target_agent="OrchestratorAgent",
session_id=f"my-platform-{external_input['thread_id']}",
platform_context={
"channel_id": external_input["channel_id"],
"thread_id": external_input["thread_id"],
"reply_to_message_id": external_input["message_id"],
},
)
async def handle_text_chunk(self, text: str, context: ResponseContext) -> None:
channel = context.platform_context["channel_id"]
thread = context.platform_context["thread_id"]
await self.platform_client.send_message(channel, thread, text)
The Full Custom Gateway (Advanced)
If the adapter pattern does not meet your needs, you can create a full custom gateway.
Create a Gateway from Scratch
Use the CLI to generate a template:
sam add gateway my-interface
This creates a Python gateway template file which you can customize. The generated component extends BaseGatewayComponent directly and implements all protocol handling manually.
When to Use Full Custom Gateways
Consider a full custom gateway only if you need:
- Highly specialized authentication flows not supported by the standard flow
- Custom A2A protocol behavior or extensions
- Complex multi-stage processing pipelines with custom state management
- Fine-grained control over every aspect of the gateway lifecycle
Full custom gateways require significantly more code and expertise but provide complete control over the gateway behavior.
For more information, see Creating Custom Gateways.
Building and Distributing
Prerequisites
Ensure the Python build package is installed:
pip install build
Build Your Plugin
Once your adapter is working:
cd my-gateway-plugin
solace-agent-mesh plugin build
This creates a distributable package in the dist/ directory.
Distribution Options
Others can add your plugin in several ways:
From a local package:
solace-agent-mesh plugin add my-custom-entrypoint --plugin ./dist/my_gateway_plugin-0.1.0.tar.gz
From PyPI (if published):
solace-agent-mesh plugin add my-custom-entrypoint --plugin my-gateway-plugin
From a Git repository:
solace-agent-mesh plugin add my-custom-entrypoint --plugin git+https://github.com/username/my-gateway-plugin
Where:
my-custom-entrypointis the name you choose for the component instance- The
--pluginvalue can be a PyPI package name, a local path, or a Git URL
Last Words of Wisdom
-
Start with
prepare_task()only. Get a message flowing end-to-end before adding streaming, files, or error handling. The framework provides sensible defaults for everything else. -
Use
platform_contextfor routing. Store channel IDs, thread references, or reply-to data inplatform_contextduringprepare_task(). It carries through to all response handlers without any extra plumbing. -
The adapter never touches A2A. You work with
SamTask,SamUpdate,SamTextPart, andSamFilePart. The framework translates to and from the A2A protocol. -
Validate your config with Pydantic. Define a
ConfigModelso misconfigurations fail at startup, not at runtime when a request arrives. -
Use state management for multi-step interactions. Task state (1 hour TTL) is great for tracking in-progress message IDs. Session state (24 hour TTL) works for user preferences or conversation metadata.
-
Override the handlers you need. The individual handlers (
handle_text_chunk,handle_file, etc.) are no-ops by default. The basehandle_update()dispatches to them automatically. -
Look at existing adapter implementations for reference. The Slack adapter demonstrates streaming with message queuing, file uploads, citation resolution, feedback buttons, and error handling. Check the solace-agent-mesh-core-plugins repository for complete examples.
-
Test by mocking
GatewayContext. UseMagicMock(spec=GatewayContext)and setadapter_configto a real Pydantic instance. Testprepare_task()andextract_auth_claims()as independent async functions.
Learn More
- Creating Custom Gateways: Official Documentation
- Gateways Overview: Gateways
- Plugins: Plugins Documentation
- Core Plugins Repository: solace-agent-mesh-core-plugins
- Repository: github.com/SolaceLabs/solace-agent-mesh
For more information about Solace Agent Mesh, visit the repository:
An event-driven framework designed to build and orchestrate multi-agent AI systems. It enables seamless integration of AI agents with real-world data sources and systems, facilitating complex, multi-step workflows.