External Communication with Agent Workflows¶
External parties can communicate with Agent workflows by sending Data messages to trigger specific operations. This enables integration with external systems, APIs, and applications.
Message Communication¶
Message Types and Threads¶
External systems send Data messages to Agent workflows. These messages are part of message threads that organize conversations and context.
Communication Protocols¶
External parties can send messages through:
- REST API: HTTP requests for synchronous communication
- WebSocket: Real-time bidirectional communication
- Server-Sent Events (SSE): Server-to-client streaming
Detailed API documentation: User API Reference
Calling the Agent - HTTP¶
Both REST and WebSocket clients can send Data messages:
REST API - Synchronous¶
// REST API call
const response = await fetch('/api/user/rest/converse', {
method: 'POST',
params: {
workflow: 'MyAgent:MyFlow',
type: 'Data',
text: 'ProcessDocument', // Method name
participantId: 'user@example.com'
},
body: JSON.stringify({ // Method arguments
documentId: 'doc-123',
options: { validate: true }
})
});
See REST API for more details. There are Synchronous and Asynchronous methods available.
WebSocket¶
// WebSocket using TypeScript SDK
await connection.invoke("SendInboundMessage", {
participantId: "user-123",
workflow: "customer-support",
type: "Data",
text: "ProcessDocument",
data: {
documentId: 'doc-123',
options: { validate: true }
}
}, "Data");
See WebSocket Client for more details.
On Agent - RPC Invocation¶
Data Message Structure¶
Workflows receive Data messages using RPC-style method invocation:
text: Method name to invokedata: Method arguments (JSON)
Setup Data Processor¶
Configure data processing in your workflow:
var agent = new AgentTeam("Document Agent");
var flow = agent.AddAgent<DocumentFlow>();
// Option 1: Process in Temporal Activity (default)
flow.SetDataProcessor<DocumentProcessor>();
// Option 2: Process in Temporal Workflow
flow.SetDataProcessor<DocumentProcessor>(processInWorkflow: true);
Processing Location¶
Use the processInWorkflow parameter to control execution:
| Parameter Value | Execution Location | Use Cases | Benefits |
|---|---|---|---|
false (default) | Temporal Activity | • Stateless, one-off operations • Simple data processing | • Direct IO operations |
true | Temporal Workflow | • Complex orchestration scenarios • When you need Temporal business process capabilities | • Access to workflow features (timers, signals, queries) |
Data Processor Implementation¶
public class DocumentProcessor
{
public DocumentProcessor(MessageThread messageThread) { }
// Method invoked by external 'ProcessDocument' calls
public async Task<DocumentResult> ProcessDocument(DocumentRequest request)
{
// Process the document
return new DocumentResult
{
Id = request.DocumentId,
Status = "processed",
ValidationResults = await ValidateDocument(request)
};
}
// Method invoked by external 'GetStatus' calls
public DocumentStatus GetStatus(string documentId)
{
return _documentService.GetStatus(documentId);
}
}
Accessing the Flow Instance¶
When processInWorkflow: true, you can receive the active Flow instance via the processor's constructor for workflow state management:
public class DocumentProcessor
{
private readonly MessageThread _messageThread;
private readonly DocumentFlow _flow;
// In workflow mode: Both MessageThread and Flow instance available
public DocumentProcessor(MessageThread messageThread, DocumentFlow flow)
{
_messageThread = messageThread;
_flow = flow;
}
public async Task<DocumentResult> ProcessDocument(DocumentRequest request)
{
// Access workflow state through _flow instance
_flow.DocumentsProcessed++;
return new DocumentResult { /* ... */ };
}
}
Constructor Parameters by Mode:
- Activity mode (
processInWorkflow: false): OnlyMessageThreadparameter allowed - Workflow mode (
processInWorkflow: true): BothMessageThreadand Flow instance parameters available
Lifecycle and State:
- Data processor instances are created per invocation
- For state management across invocations, use the passed Flow instance when in workflow mode
- In activity mode, use external storage or the MessageThread for state coordination
For detailed implementation guidance, see Handling Data as RPC.
Example¶
Document Processing Service¶
public class DocumentProcessor
{
public async Task<ProcessingResult> ProcessDocument(DocumentRequest request)
{
var result = await _processor.ProcessAsync(request.Content);
return new ProcessingResult
{
DocumentId = request.DocumentId,
ProcessedAt = DateTime.UtcNow,
Status = "completed",
ExtractedData = result.Data
};
}
}
External API Call¶
POST /api/user/rest/converse?workflow=DocumentAgent:DocumentFlow&type=Data&text=ProcessDocument
{
"DocumentId": "doc-123",
"Content": "base64-encoded-content",
"Options": {
"extractMetadata": true,
"validateFormat": true
}
}
Response¶
{
"requestId": "req-456",
"threadId": "thread-789",
"response": {
"text": null,
"data": {
"DocumentId": "doc-123",
"ProcessedAt": "2024-01-15T10:30:00Z",
"Status": "completed",
"ExtractedData": { /* processed data */ }
},
"direction": 1,
"messageType": 1,
"scope": null,
"hint": null
}
}
This approach enables seamless integration between external systems and Agent workflows through standardized Data message communication.