Handling Data Messages¶
When sending structured data messages to agents, you have two distinct approaches available, each designed for different use cases and requirements.
Two Approaches Overview¶
1. Data as Messages¶
Process data messages manually within the Temporal workflow class using event listeners and queuing mechanisms.
Data Message Handling Documentation
For complete implementation details, see Handling Data Messages.
2. Data as RPC (DataProcessor)¶
Use a simplified RPC-style approach by setting a DataProcessor class to handle data messages as atomic operations.
Data Processing PRC Implementation
For complete implementation details, see RPC Data Processing.
When to Use Each Approach¶
Use Manual Data Handling When¶
- ✅ You need long-running workflows with complex orchestration
- ✅ You require Temporal workflow capabilities (retries, timers, child workflows)
- ✅ You need to coordinate multiple activities over time
- ✅ You need durability and fault tolerance for complex business processes
Use RPC Abstraction When¶
- ✅ You need simple, atomic operations (request → process → respond)
- ✅ You want immediate responses without workflow overhead
- ✅ You prefer simplified development with less boilerplate
- ✅ You don't need Temporal orchestration features
Comparison Matrix¶
Feature | Manual Handling | RPC Abstraction |
---|---|---|
Execution Context | Full Temporal Workflow | Single Temporal Activity |
Orchestration | ✅ Full capabilities | ❌ Not available |
Complexity | Higher (manual queuing) | Lower (automatic handling) |
Fault Tolerance | ✅ Full Temporal guarantees | Limited (activity-level only) |
Use Case | Complex business processes | Simple data calls |
Data messages require explicit handling in agent flows, unlike chat messages which are automatically processed by SemanticKernel.
Chat vs Data Messages¶
Chat Messages:
- Automatically intercepted by SemanticKernel when
await InitConversation()
is called under the[WorkflowRun]
method - Routed directly to Semantic Kernel for processing
Data Messages:
- NOT routed to Semantic Kernel
- Require explicit listeners and manual processing
- Best practice: Queue messages and process them in the
[WorkflowRun]
method
Implementation Pattern¶
1. Setup Data Handler Subscription¶
Subscribe to data messages in your flow constructor:
public DocumentDataFlow()
{
_messageHub.SubscribeDataHandler(
(MessageThread thread) => {
_logger.LogInformation($"MessageThread received: {thread.ThreadId}");
_messageQueue.Enqueue(thread);
});
}
2. Queue Messages¶
Use a concurrent queue to store incoming data messages:
3. Process Messages in WorkflowRun¶
Handle queued messages in your main workflow loop:
[WorkflowRun]
public async Task Run()
{
while (true)
{
var messageThread = await DequeueMessage();
if (messageThread == null) continue;
var messageType = Message.GetMessageType(messageThread);
if (messageType == nameof(FetchDocument))
{
await HandleDocumentRequest(messageThread);
}
}
}
4. Dequeue Messages Deterministically¶
private async Task<MessageThread?> DequeueMessage()
{
await Workflow.WaitConditionAsync(() => _messageQueue.Count > 0);
return _messageQueue.TryDequeue(out var thread) ? thread : null;
}
Complete Example¶
[Workflow("Document Data Flow")]
public class DocumentDataFlow : FlowBase
{
private readonly ConcurrentQueue<MessageThread> _messageQueue = new();
private static readonly Logger<DocumentDataFlow> _logger = Logger<DocumentDataFlow>.For();
public DocumentDataFlow()
{
_messageHub.SubscribeDataHandler(
(MessageThread thread) => {
_logger.LogInformation($"MessageThread received: {thread.ThreadId}");
_messageQueue.Enqueue(thread);
});
}
[WorkflowRun]
public async Task Run()
{
while (true)
{
try
{
var messageThread = await DequeueMessage();
if (messageThread == null) continue;
var messageType = Message.GetMessageType(messageThread);
if (messageType == nameof(FetchDocument))
{
await HandleDocumentRequest(messageThread);
}
}
catch (Exception ex)
{
_logger.LogError("Error processing data message", ex);
}
}
}
private async Task HandleDocumentRequest(MessageThread messageThread)
{
var documentRequest = FetchDocument.FromThread(messageThread);
var result = await Workflow.ExecuteActivityAsync(
(IDocumentDataActivities act) => act.ValidateDocument(documentRequest.DocumentId),
_activityOptions);
await messageThread.SendData(new DocumentResponse {
AuditResult = result,
});
}
}
Key Points¶
- Data messages bypass SemanticKernel and require manual handling
- Always use deterministic queuing with
Workflow.WaitConditionAsync()
- Process messages within the
[WorkflowRun]
method for proper Temporal workflow execution - Handle exceptions to prevent workflow failures