Improve e2e integration tests and isolate tests from other things; includes patch to Serializer (#5497)

* integration tests used to use the samples - now they are separate
* patch dictionary problem in serializer
* add Message Registry with dead letter queue that gets checked on new
subs.
This commit is contained in:
Ryan Sweet 2025-02-13 16:43:57 -08:00 committed by GitHub
parent 3abc022ca9
commit ff7f863e73
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
40 changed files with 1425 additions and 136 deletions

View File

@ -140,7 +140,13 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Microsoft.AutoGen.AgentChat
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Microsoft.AutoGen.AgentChat.Tests", "test\Microsoft.AutoGen.AgentChat.Tests\Microsoft.AutoGen.AgentChat.Tests.csproj", "{217A4F86-8ADD-4998-90BA-880092A019F5}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "HelloAgent.AppHost", "test\Microsoft.AutoGen.Integration.Tests.AppHosts\HelloAgent.AppHost\HelloAgent.AppHost.csproj", "{0C371D65-7EF9-44EA-8128-A105DA82A80E}"
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Microsoft.AutoGen.Integration.Tests.AppHosts", "Microsoft.AutoGen.Integration.Tests.AppHosts", "{D1C2B0BB-1276-4146-A699-D1983AE8ED04}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "HelloAgentTests", "test\Microsoft.AutoGen.Integration.Tests.AppHosts\HelloAgentTests\HelloAgentTests.csproj", "{CD10E29A-725E-4BEF-9CFF-6C0E0A652926}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "InMemoryTests.AppHost", "test\Microsoft.AutoGen.Integration.Tests.AppHosts\InMemoryTests.AppHost\InMemoryTests.AppHost.csproj", "{1E4E1ED4-7701-4A05-A861-64461C3B1EE3}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "XlangTests.AppHost", "test\Microsoft.AutoGen.Integration.Tests.AppHosts\XLangTests.AppHost\XlangTests.AppHost.csproj", "{62CDFB27-3B02-4D4B-B789-8AAD5E20688A}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
@ -372,6 +378,18 @@ Global
{0C371D65-7EF9-44EA-8128-A105DA82A80E}.Debug|Any CPU.Build.0 = Debug|Any CPU
{0C371D65-7EF9-44EA-8128-A105DA82A80E}.Release|Any CPU.ActiveCfg = Release|Any CPU
{0C371D65-7EF9-44EA-8128-A105DA82A80E}.Release|Any CPU.Build.0 = Release|Any CPU
{CD10E29A-725E-4BEF-9CFF-6C0E0A652926}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{CD10E29A-725E-4BEF-9CFF-6C0E0A652926}.Debug|Any CPU.Build.0 = Debug|Any CPU
{CD10E29A-725E-4BEF-9CFF-6C0E0A652926}.Release|Any CPU.ActiveCfg = Release|Any CPU
{CD10E29A-725E-4BEF-9CFF-6C0E0A652926}.Release|Any CPU.Build.0 = Release|Any CPU
{1E4E1ED4-7701-4A05-A861-64461C3B1EE3}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{1E4E1ED4-7701-4A05-A861-64461C3B1EE3}.Debug|Any CPU.Build.0 = Debug|Any CPU
{1E4E1ED4-7701-4A05-A861-64461C3B1EE3}.Release|Any CPU.ActiveCfg = Release|Any CPU
{1E4E1ED4-7701-4A05-A861-64461C3B1EE3}.Release|Any CPU.Build.0 = Release|Any CPU
{62CDFB27-3B02-4D4B-B789-8AAD5E20688A}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{62CDFB27-3B02-4D4B-B789-8AAD5E20688A}.Debug|Any CPU.Build.0 = Debug|Any CPU
{62CDFB27-3B02-4D4B-B789-8AAD5E20688A}.Release|Any CPU.ActiveCfg = Release|Any CPU
{62CDFB27-3B02-4D4B-B789-8AAD5E20688A}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
@ -436,7 +454,10 @@ Global
{EF954ED3-87D5-40F1-8557-E7179F43EA0E} = {18BF8DD7-0585-48BF-8F97-AD333080CE06}
{7F828599-56E8-4597-8F68-EE26FD631417} = {18BF8DD7-0585-48BF-8F97-AD333080CE06}
{217A4F86-8ADD-4998-90BA-880092A019F5} = {F823671B-3ECA-4AE6-86DA-25E920D3FE64}
{0C371D65-7EF9-44EA-8128-A105DA82A80E} = {F42F9C8E-7BD9-4687-9B63-AFFA461AF5C1}
{D1C2B0BB-1276-4146-A699-D1983AE8ED04} = {F823671B-3ECA-4AE6-86DA-25E920D3FE64}
{CD10E29A-725E-4BEF-9CFF-6C0E0A652926} = {D1C2B0BB-1276-4146-A699-D1983AE8ED04}
{1E4E1ED4-7701-4A05-A861-64461C3B1EE3} = {D1C2B0BB-1276-4146-A699-D1983AE8ED04}
{62CDFB27-3B02-4D4B-B789-8AAD5E20688A} = {D1C2B0BB-1276-4146-A699-D1983AE8ED04}
EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution
SolutionGuid = {93384647-528D-46C8-922C-8DB36A382F0B}

View File

@ -23,6 +23,7 @@
<ProjectReference Include="..\..\..\src\Microsoft.AutoGen\Contracts\Microsoft.AutoGen.Contracts.csproj" />
<ProjectReference Include="..\..\..\src\Microsoft.AutoGen\Core.Grpc\Microsoft.AutoGen.Core.Grpc.csproj" />
<ProjectReference Include="..\..\..\src\Microsoft.AutoGen\Core\Microsoft.AutoGen.Core.csproj" />
<ProjectReference Include="..\..\..\src\Microsoft.AutoGen\Core.Grpc\Microsoft.AutoGen.Core.Grpc.csproj" />
</ItemGroup>
<ItemGroup>

View File

@ -4,14 +4,22 @@
using Microsoft.AutoGen.Agents;
using Microsoft.AutoGen.Contracts;
using Microsoft.AutoGen.Core;
using Microsoft.AutoGen.Core.Grpc;
using Samples;
// Set up app builder for in-process runtime, allow message delivery to self, and add the Hello agent
AgentsAppBuilder appBuilder = new AgentsAppBuilder()
.UseInProcessRuntime(deliverToSelf: true)
.AddAgent<HelloAgent>("HelloAgent");
var appBuilder = new AgentsAppBuilder(); // Create app builder
// if we are using distributed, we need the AGENT_HOST var defined and then we will use the grpc runtime
if (Environment.GetEnvironmentVariable("AGENT_HOST") is string agentHost)
{
appBuilder.AddGrpcAgentWorker(agentHost)
.AddAgent<HelloAgent>("HelloAgent");
}
else
{
// Set up app builder for in-process runtime, allow message delivery to self, and add the Hello agent
appBuilder.UseInProcessRuntime(deliverToSelf: true).AddAgent<HelloAgent>("HelloAgent");
}
var app = await appBuilder.BuildAsync(); // Build the app
// Create a custom message type from proto and define message
NewMessageReceived message = new NewMessageReceived { Message = "Hello World!" };
await app.PublishMessageAsync(message, new TopicId("HelloTopic")); // Publish custom message (handler has been set in HelloAgent)
await app.WaitForShutdownAsync(); // Wait for shutdown from agent
var message = new NewMessageReceived { Message = "Hello World!" };
await app.PublishMessageAsync(message, new TopicId("HelloTopic")).ConfigureAwait(false); // Publish custom message (handler has been set in HelloAgent)
await app.WaitForShutdownAsync().ConfigureAwait(false); // Wait for shutdown from agent

View File

@ -1,6 +1,5 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// IHandleConsole.cs
using Google.Protobuf;
using Microsoft.AutoGen.Contracts;
namespace Microsoft.AutoGen.Agents;
@ -14,13 +13,12 @@ public interface IHandleConsole : IHandle<Output>, IHandle<Input>, IProcessIO
/// <summary>
/// Prototype for Publish Message Async method
/// </summary>
/// <typeparam name="T"></typeparam>
/// <param name="message"></param>
/// <param name="topic"></param>
/// <param name="messageId"></param>
/// <param name="token"></param>
/// <param name="cancellationToken"></param>
/// <returns>ValueTask</returns>
ValueTask PublishMessageAsync<T>(T message, TopicId topic, string? messageId, CancellationToken token = default) where T : IMessage;
ValueTask PublishMessageAsync(object message, TopicId topic, string? messageId = null, CancellationToken cancellationToken = default);
/// <summary>
/// Receives events of type Output and writes them to the console
@ -39,7 +37,7 @@ public interface IHandleConsole : IHandle<Output>, IHandle<Input>, IProcessIO
{
Route = "console"
};
await PublishMessageAsync(evt, new TopicId("OutputWritten"), null, token: CancellationToken.None).ConfigureAwait(false);
await PublishMessageAsync(evt, new TopicId("OutputWritten"), null, cancellationToken: CancellationToken.None).ConfigureAwait(false);
}
/// <summary>
@ -60,6 +58,6 @@ public interface IHandleConsole : IHandle<Output>, IHandle<Input>, IProcessIO
{
Route = "console"
};
await PublishMessageAsync(evt, new TopicId("InputProcessed"), null, token: CancellationToken.None).ConfigureAwait(false);
await PublishMessageAsync(evt, new TopicId("InputProcessed"), null, cancellationToken: CancellationToken.None).ConfigureAwait(false);
}
}

View File

@ -1,7 +1,5 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// IHandleFileIO.cs
using Google.Protobuf;
using Microsoft.AutoGen.Contracts;
using Microsoft.Extensions.Logging;
@ -25,13 +23,12 @@ public interface IHandleFileIO : IHandle<Input>, IHandle<Output>, IProcessIO
/// <summary>
/// Prototype for Publish Message Async method
/// </summary>
/// <typeparam name="T"></typeparam>
/// <param name="message"></param>
/// <param name="topic"></param>
/// <param name="messageId"></param>
/// <param name="token"></param>
/// <param name="cancellationToken"></param>
/// <returns>ValueTask</returns>
ValueTask PublishMessageAsync<T>(T message, TopicId topic, string? messageId, CancellationToken token = default) where T : IMessage;
ValueTask PublishMessageAsync(object message, TopicId topic, string? messageId = null, CancellationToken cancellationToken = default);
async ValueTask IHandle<Input>.HandleAsync(Input item, MessageContext messageContext)
{
@ -45,7 +42,7 @@ public interface IHandleFileIO : IHandle<Input>, IHandle<Output>, IProcessIO
{
Message = errorMessage
};
await PublishMessageAsync(err, new TopicId("IOError"), null, token: CancellationToken.None).ConfigureAwait(false);
await PublishMessageAsync(err, new TopicId("IOError"), null, cancellationToken: CancellationToken.None).ConfigureAwait(false);
return;
}
string content;
@ -58,7 +55,7 @@ public interface IHandleFileIO : IHandle<Input>, IHandle<Output>, IProcessIO
{
Route = Route
};
await PublishMessageAsync(evt, new TopicId("InputProcessed"), null, token: CancellationToken.None).ConfigureAwait(false);
await PublishMessageAsync(evt, new TopicId("InputProcessed"), null, cancellationToken: CancellationToken.None).ConfigureAwait(false);
}
async ValueTask IHandle<Output>.HandleAsync(Output item, MessageContext messageContext)
{
@ -70,6 +67,6 @@ public interface IHandleFileIO : IHandle<Input>, IHandle<Output>, IProcessIO
{
Route = Route
};
await PublishMessageAsync(evt, new TopicId("OutputWritten"), null, token: CancellationToken.None).ConfigureAwait(false);
await PublishMessageAsync(evt, new TopicId("OutputWritten"), null, cancellationToken: CancellationToken.None).ConfigureAwait(false);
}
}

View File

@ -28,10 +28,7 @@ public class ProtobufSerializationRegistry : IProtoSerializationRegistry
public void RegisterSerializer(Type type, IProtobufMessageSerializer serializer)
{
if (_serializers.ContainsKey(TypeNameResolver.ResolveTypeName(type)))
{
throw new InvalidOperationException($"Serializer already registered for {type.FullName}");
}
_serializers.TryAdd(TypeNameResolver.ResolveTypeName(type), serializer);
_serializers[TypeNameResolver.ResolveTypeName(type)] = serializer;
}
}

View File

@ -4,12 +4,40 @@ using System.Collections.Concurrent;
using Microsoft.AutoGen.Protobuf;
namespace Microsoft.AutoGen.RuntimeGateway.Grpc.Abstractions;
/// <summary>
/// Stores agent subscription information such as topic and prefix mappings,
/// and maintains an ETag for concurrency checks.
/// </summary>
public class AgentsRegistryState
{
/// <summary>
/// Maps each agent ID to the set of topics they subscribe to.
/// </summary>
public ConcurrentDictionary<string, HashSet<string>> AgentsToTopicsMap { get; set; } = [];
/// <summary>
/// Maps each agent ID to the set of topic prefixes they subscribe to.
/// </summary>
public ConcurrentDictionary<string, HashSet<string>> AgentsToTopicsPrefixMap { get; set; } = [];
/// <summary>
/// Maps each topic name to the set of agent types subscribed to it.
/// </summary>
public ConcurrentDictionary<string, HashSet<string>> TopicToAgentTypesMap { get; set; } = [];
/// <summary>
/// Maps each topic prefix to the set of agent types subscribed to it.
/// </summary>
public ConcurrentDictionary<string, HashSet<string>> TopicPrefixToAgentTypesMap { get; set; } = [];
/// <summary>
/// Stores subscriptions by GUID
/// </summary>
public ConcurrentDictionary<string, HashSet<Subscription>> GuidSubscriptionsMap { get; set; } = [];
/// <summary>
/// The concurrency ETag for identifying the registry's version or state.
/// </summary>
public string Etag { get; set; } = Guid.NewGuid().ToString();
}

View File

@ -0,0 +1,22 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// IMessageRegistryGrain.cs
using Microsoft.AutoGen.Contracts;
namespace Microsoft.AutoGen.RuntimeGateway.Grpc.Abstractions;
public interface IMessageRegistryGrain : IGrainWithIntegerKey
{
/// <summary>
/// Writes a message to the dead-letter queue for the given topic.
/// </summary>
Task WriteMessageAsync(string topic, CloudEvent message);
/// <summary>
/// Removes all messages for the given topic from the dead-letter queue.
/// </summary>
/// <param name="topic">The topic to remove messages for.</param>
/// <returns>A task representing the asynchronous operation, with the list of removed messages as the result.</returns>
Task<List<CloudEvent>> RemoveMessagesAsync(string topic);
}

View File

@ -0,0 +1,18 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// MessageRegistryState.cs
using System.Collections.Concurrent;
using Microsoft.AutoGen.Contracts;
namespace Microsoft.AutoGen.RuntimeGateway.Grpc.Abstractions;
/// <summary>
/// Holds a dead-letter queue by topic type.
/// </summary>
public class MessageRegistryState
{
/// <summary>
/// Dictionary mapping topic types to a list of CloudEvents that failed delivery.
/// </summary>
public ConcurrentDictionary<string, List<CloudEvent>> DeadLetterQueue { get; set; } = new();
}

View File

@ -18,8 +18,18 @@ public sealed class GrpcGateway : BackgroundService, IGateway
{
private static readonly TimeSpan s_agentResponseTimeout = TimeSpan.FromSeconds(30);
private readonly ILogger<GrpcGateway> _logger;
/// <summary>
/// The Orleans cluster client.
/// </summary>
private readonly IClusterClient _clusterClient;
/// <summary>
/// The Orleans Grain that manages the AgentRegistration, Subscription, and Gateways
/// </summary>
private readonly IRegistryGrain _gatewayRegistry;
/// <summary>
/// The Orleans Grain that manages the DeadLetterQueue and MessageBuffer
/// </summary>
private readonly IMessageRegistryGrain _messageRegistry;
private readonly IGateway _reference;
private readonly ConcurrentDictionary<string, List<GrpcWorkerConnection>> _supportedAgentTypes = [];
public readonly ConcurrentDictionary<string, GrpcWorkerConnection> _workers = new();
@ -37,6 +47,8 @@ public sealed class GrpcGateway : BackgroundService, IGateway
_clusterClient = clusterClient;
_reference = clusterClient.CreateObjectReference<IGateway>(this);
_gatewayRegistry = clusterClient.GetGrain<IRegistryGrain>(0);
_messageRegistry = clusterClient.GetGrain<IMessageRegistryGrain>(0);
}
/// <summary>
@ -148,6 +160,29 @@ public sealed class GrpcGateway : BackgroundService, IGateway
// We do not actually need to defer these, since we do not listen to ClientId on this for some reason...
// TODO: Fix this
await _gatewayRegistry.SubscribeAsync(request).ConfigureAwait(true);
var topic = request.Subscription.SubscriptionCase switch
{
Subscription.SubscriptionOneofCase.TypeSubscription
=> request.Subscription.TypeSubscription.TopicType,
Subscription.SubscriptionOneofCase.TypePrefixSubscription
=> request.Subscription.TypePrefixSubscription.TopicTypePrefix,
_ => null
};
if (!string.IsNullOrEmpty(topic))
{
var removedMessages = await _messageRegistry.RemoveMessagesAsync(topic);
if (removedMessages.Any())
{
_logger.LogInformation("Removed {Count} dead-letter messages for topic '{Topic}'.", removedMessages.Count, topic);
// now that someone is subscribed, dispatch the messages
foreach (var message in removedMessages)
{
await DispatchEventAsync(message).ConfigureAwait(true);
}
}
}
return new AddSubscriptionResponse { };
}
catch (Exception ex)
@ -311,7 +346,7 @@ public sealed class GrpcGateway : BackgroundService, IGateway
{
var registry = _clusterClient.GetGrain<IRegistryGrain>(0);
//intentionally blocking
var targetAgentTypes = await registry.GetSubscribedAndHandlingAgentsAsync(evt.Source, evt.Type).ConfigureAwait(true);
var targetAgentTypes = await registry.GetSubscribedAndHandlingAgentsAsync(evt.Type, evt.Source).ConfigureAwait(true);
if (targetAgentTypes is not null && targetAgentTypes.Count > 0)
{
targetAgentTypes = targetAgentTypes.Distinct().ToList();
@ -324,15 +359,26 @@ public sealed class GrpcGateway : BackgroundService, IGateway
var activeConnections = connections.Where(c => c.Completion?.IsCompleted == false).ToList();
foreach (var connection in activeConnections)
{
_logger.LogDebug("Dispatching event {Event} to connection {Connection}, for AgentType {AgentType}.", evt, connection, agentType);
tasks.Add(this.WriteResponseAsync(connection, evt, cancellationToken));
}
}
else
{
// we have target agent types that aren't in the supported agent types
// could be a race condition or a bug
_logger.LogWarning($"Agent type {agentType} is not supported, but registry returned it as subscribed to {evt.Type}/{evt.Source}. Buffering an event to the dead-letter queue.");
await _messageRegistry.WriteMessageAsync(evt.Source, evt).ConfigureAwait(true);
}
}
await Task.WhenAll(tasks).ConfigureAwait(false);
}
else
{
// log that no agent types were found
_logger.LogWarning("No agent types found for event type {EventType}.", evt.Type);
_logger.LogWarning("No agent types found for event type {EventType}. Adding to Dead Letter Queue", evt.Type);
// buffer the event to the dead-letter queue
await _messageRegistry.WriteMessageAsync(evt.Source, evt).ConfigureAwait(true);
}
}
@ -423,28 +469,6 @@ public sealed class GrpcGateway : BackgroundService, IGateway
throw new RpcException(new Status(StatusCode.InvalidArgument, error));
}
/// <summary>
/// Dispatches an event to the specified agent types.
/// </summary>
/// <param name="agentTypes">The agent types.</param>
/// <param name="evt">The cloud event.</param>
/// <returns>A task that represents the asynchronous operation.</returns>
private async ValueTask DispatchEventToAgentsAsync(IEnumerable<string> agentTypes, CloudEvent evt)
{
var tasks = new List<Task>(agentTypes.Count());
foreach (var agentType in agentTypes)
{
if (_supportedAgentTypes.TryGetValue(agentType, out var connections))
{
foreach (var connection in connections)
{
tasks.Add(this.WriteResponseAsync(connection, evt));
}
}
}
await Task.WhenAll(tasks).ConfigureAwait(false);
}
/// <summary>
/// Writes a response to a worker connection.
/// </summary>
@ -456,15 +480,4 @@ public sealed class GrpcGateway : BackgroundService, IGateway
{
await connection.ResponseStream.WriteAsync(new Message { CloudEvent = cloudEvent }, cancellationToken).ConfigureAwait(false);
}
/// <summary>
/// Writes a response to a worker connection.
/// </summary>
/// <param name="connection">The worker connection.</param>
/// <param name="cloudEvent">The cloud event.</param>
/// <returns>A task that represents the asynchronous operation.</returns>
public async Task WriteResponseAsync(IConnection connection, CloudEvent cloudEvent)
{
await WriteResponseAsync((GrpcWorkerConnection)connection, cloudEvent, default).ConfigureAwait(false);
}
}

View File

@ -0,0 +1,56 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// MessageRegistryGrain.cs
using Microsoft.AutoGen.Contracts;
using Microsoft.AutoGen.RuntimeGateway.Grpc.Abstractions;
using Microsoft.Extensions.Logging;
namespace Microsoft.AutoGen.RuntimeGateway.Grpc;
internal sealed class MessageRegistryGrain(
[PersistentState("state", "PubSubStore")] IPersistentState<MessageRegistryState> state,
ILogger<MessageRegistryGrain> logger
) : Grain, IMessageRegistryGrain
{
private const int _retries = 5;
private readonly ILogger<MessageRegistryGrain> _logger = logger;
public async Task WriteMessageAsync(string topic, CloudEvent message)
{
var retries = _retries;
while (!await WriteMessageAsync(topic, message, state.Etag).ConfigureAwait(false))
{
if (retries-- <= 0)
{
throw new InvalidOperationException($"Failed to write MessageRegistryState after {_retries} retries.");
}
_logger.LogWarning("Failed to write MessageRegistryState. Retrying...");
retries--;
}
}
private async ValueTask<bool> WriteMessageAsync(string topic, CloudEvent message, string etag)
{
if (state.Etag != null && state.Etag != etag)
{
return false;
}
var queue = state.State.DeadLetterQueue.GetOrAdd(topic, _ => new());
queue.Add(message);
state.State.DeadLetterQueue.AddOrUpdate(topic, queue, (_, _) => queue);
await state.WriteStateAsync().ConfigureAwait(true);
return true;
}
public async Task<List<CloudEvent>> RemoveMessagesAsync(string topic)
{
if (state.State.DeadLetterQueue != null && state.State.DeadLetterQueue.Remove(topic, out List<CloudEvent>? letters))
{
await state.WriteStateAsync().ConfigureAwait(true);
if (letters != null)
{
return letters;
}
}
return [];
}
}

View File

@ -0,0 +1,35 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// AnySurrogate.cs
using Google.Protobuf;
using Google.Protobuf.WellKnownTypes;
namespace Microsoft.AutoGen.RuntimeGateway.Grpc.Orleans.Surrogates;
[GenerateSerializer]
[Alias("Microsoft.AutoGen.RuntimeGateway.Grpc.Orleans.Surrogates.AnySurrogate")]
public struct AnySurrogate
{
[Id(0)]
public string TypeUrl;
[Id(1)]
public byte[] Value;
}
[RegisterConverter]
public sealed class AnySurrogateConverter : IConverter<Any, AnySurrogate>
{
public Any ConvertFromSurrogate(in AnySurrogate surrogate) =>
new()
{
TypeUrl = surrogate.TypeUrl,
Value = ByteString.CopyFrom(surrogate.Value)
};
public AnySurrogate ConvertToSurrogate(in Any value) =>
new()
{
TypeUrl = value.TypeUrl,
Value = value.Value.ToByteArray()
};
}

View File

@ -0,0 +1,49 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// HelloAgent.cs
using Microsoft.AutoGen.Agents;
using Microsoft.AutoGen.Contracts;
using Microsoft.AutoGen.Core;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
namespace Samples;
[TypeSubscription("HelloTopic")]
public class HelloAgent(
IHostApplicationLifetime hostApplicationLifetime,
AgentId id,
IAgentRuntime runtime,
Logger<BaseAgent>? logger = null) : BaseAgent(id, runtime, "Hello Agent", logger),
IHandle<NewMessageReceived>,
IHandle<ConversationClosed>,
IHandle<Shutdown>, IHandleConsole
{
// This will capture the message sent in Program.cs
public async ValueTask HandleAsync(NewMessageReceived item, MessageContext messageContext)
{
Console.Out.WriteLine(item.Message); // Print message to console
ConversationClosed goodbye = new ConversationClosed
{
UserId = this.Id.Type,
UserMessage = "Goodbye"
};
// This will publish the new message type which will be handled by the ConversationClosed handler
await this.PublishMessageAsync(goodbye, new TopicId("HelloTopic"));
}
public async ValueTask HandleAsync(ConversationClosed item, MessageContext messageContext)
{
var goodbye = $"{item.UserId} said {item.UserMessage}"; // Print goodbye message to console
Console.Out.WriteLine(goodbye);
if (Environment.GetEnvironmentVariable("STAY_ALIVE_ON_GOODBYE") != "true")
{
// Publish message that will be handled by shutdown handler
await this.PublishMessageAsync(new Shutdown(), new TopicId("HelloTopic"));
}
}
public async ValueTask HandleAsync(Shutdown item, MessageContext messageContext)
{
Console.WriteLine("Shutting down...");
hostApplicationLifetime.StopApplication(); // Shuts down application
}
}

View File

@ -0,0 +1,32 @@
<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<OutputType>Exe</OutputType>
<TargetFramework>net8.0</TargetFramework>
<ImplicitUsings>enable</ImplicitUsings>
<Nullable>enable</Nullable>
</PropertyGroup>
<ItemGroup>
<None Update="appsettings.json">
<CopyToOutputDirectory>PreserveNewest</CopyToOutputDirectory>
</None>
</ItemGroup>
<ItemGroup>
<PackageReference Include="Microsoft.Extensions.Hosting" />
</ItemGroup>
<ItemGroup>
<PackageReference Include="Google.Protobuf" />
<PackageReference Include="Grpc.Tools" PrivateAssets="All" />
</ItemGroup>
<ItemGroup>
<ProjectReference Include="..\..\..\src\Microsoft.AutoGen\Contracts\Microsoft.AutoGen.Contracts.csproj" />
<ProjectReference Include="..\..\..\src\Microsoft.AutoGen\Core\Microsoft.AutoGen.Core.csproj" />
<ProjectReference Include="..\..\..\src\Microsoft.AutoGen\Agents\Microsoft.AutoGen.Agents.csproj" />
<ProjectReference Include="..\..\..\src\Microsoft.AutoGen\Core.Grpc\Microsoft.AutoGen.Core.Grpc.csproj" />
</ItemGroup>
<ItemGroup>
<!--Protobuf Include="..\protos\agent_events.proto" Link="protos\agent_events.proto" /-->
</ItemGroup>
</Project>

View File

@ -0,0 +1,28 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Program.cs
using Microsoft.AutoGen.Agents;
using Microsoft.AutoGen.Contracts;
using Microsoft.AutoGen.Core;
using Microsoft.AutoGen.Core.Grpc;
using Samples;
var appBuilder = new AgentsAppBuilder(); // Create app builder
// if we are using distributed, we need the AGENT_HOST var defined and then we will use the grpc runtime
if (Environment.GetEnvironmentVariable("AGENT_HOST") != null)
{
appBuilder.AddGrpcAgentWorker(
Environment.GetEnvironmentVariable("AGENT_HOST"))
.AddAgent<HelloAgent>("HelloAgent");
}
else
{
// Set up app builder for in-process runtime, allow message delivery to self, and add the Hello agent
appBuilder.UseInProcessRuntime(deliverToSelf: true).AddAgent<HelloAgent>("HelloAgent");
}
var app = await appBuilder.BuildAsync(); // Build the app
// Create a custom message type from proto and define message
var message = new NewMessageReceived { Message = "Hello World!" };
await app.PublishMessageAsync(message, new TopicId("HelloTopic", "HelloAgents/dotnet")).ConfigureAwait(false); // Publish custom message (handler has been set in HelloAgent)
//await app.PublishMessageAsync(message, new TopicId("HelloTopic")).ConfigureAwait(false); // Publish custom message (handler has been set in HelloAgent)
await app.WaitForShutdownAsync().ConfigureAwait(false); // Wait for shutdown from agent

View File

@ -0,0 +1,12 @@
{
"profiles": {
"HelloAgent": {
"commandName": "Project",
"launchBrowser": true,
"environmentVariables": {
"ASPNETCORE_ENVIRONMENT": "Development"
},
"applicationUrl": "https://localhost:53113;http://localhost:53114"
}
}
}

View File

@ -0,0 +1,120 @@
# AutoGen 0.4 .NET Hello World Sample
This [sample](Program.cs) demonstrates how to create a simple .NET console application that listens for an event and then orchestrates a series of actions in response.
## Prerequisites
To run this sample, you'll need: [.NET 8.0](https://dotnet.microsoft.com/en-us/) or later.
Also recommended is the [GitHub CLI](https://cli.github.com/).
## Instructions to run the sample
```bash
# Clone the repository
gh repo clone microsoft/autogen
cd dotnet/samples/Hello
dotnet run
```
## Key Concepts
This sample illustrates how to create your own agent that inherits from a base agent and listens for an event. It also shows how to use the SDK's App Runtime locally to start the agent and send messages.
Flow Diagram:
```mermaid
%%{init: {'theme':'forest'}}%%
graph LR;
A[Main] --> |"PublishEventAsync(NewMessage('World'))"| B{"Handle(NewMessageReceived item, CancellationToken cancellationToken = default)"}
B --> |"PublishEventAsync(Output('***Hello, World***'))"| C[ConsoleAgent]
C --> D{"WriteConsole()"}
B --> |"PublishEventAsync(ConversationClosed('Goodbye'))"| E{"Handle(ConversationClosed item, CancellationToken cancellationToken = default)"}
B --> |"PublishEventAsync(Output('***Goodbye***'))"| C
E --> F{"Shutdown()"}
```
### Writing Event Handlers
The heart of an autogen application are the event handlers. Agents select a ```TopicSubscription``` to listen for events on a specific topic. When an event is received, the agent's event handler is called with the event data.
Within that event handler you may optionally *emit* new events, which are then sent to the event bus for other agents to process. The EventTypes are declared gRPC ProtoBuf messages that are used to define the schema of the event. The default protos are available via the ```Microsoft.AutoGen.Contracts;``` namespace and are defined in [autogen/protos](/autogen/protos). The EventTypes are registered in the agent's constructor using the ```IHandle``` interface.
```csharp
TopicSubscription("HelloAgents")]
public class HelloAgent(
iAgentWorker worker,
[FromKeyedServices("AgentsMetadata")] AgentsMetadata typeRegistry) : ConsoleAgent(
worker,
typeRegistry),
ISayHello,
IHandle<NewMessageReceived>,
IHandle<ConversationClosed>
{
public async Task Handle(NewMessageReceived item, CancellationToken cancellationToken = default)
{
var response = await SayHello(item.Message).ConfigureAwait(false);
var evt = new Output
{
Message = response
}.ToCloudEvent(this.AgentId.Key);
await PublishEventAsync(evt).ConfigureAwait(false);
var goodbye = new ConversationClosed
{
UserId = this.AgentId.Key,
UserMessage = "Goodbye"
}.ToCloudEvent(this.AgentId.Key);
await PublishEventAsync(goodbye).ConfigureAwait(false);
}
```
### Inheritance and Composition
This sample also illustrates inheritance in AutoGen. The `HelloAgent` class inherits from `ConsoleAgent`, which is a base class that provides a `WriteConsole` method.
### Starting the Application Runtime
AuotoGen provides a flexible runtime ```Microsoft.AutoGen.Agents.App``` that can be started in a variety of ways. The `Program.cs` file demonstrates how to start the runtime locally and send a message to the agent all in one go using the ```App.PublishMessageAsync``` method.
```csharp
// send a message to the agent
var app = await App.PublishMessageAsync("HelloAgents", new NewMessageReceived
{
Message = "World"
}, local: true);
await App.RuntimeApp!.WaitForShutdownAsync();
await app.WaitForShutdownAsync();
```
### Sending Messages
The set of possible Messages is defined in gRPC ProtoBuf specs. These are then turned into C# classes by the gRPC tools. You can define your own Message types by creating a new .proto file in your project and including the gRPC tools in your ```.csproj``` file:
```proto
syntax = "proto3";
package devteam;
option csharp_namespace = "DevTeam.Shared";
message NewAsk {
string org = 1;
string repo = 2;
string ask = 3;
int64 issue_number = 4;
}
message ReadmeRequested {
string org = 1;
string repo = 2;
int64 issue_number = 3;
string ask = 4;
}
```
```xml
<ItemGroup>
<PackageReference Include="Google.Protobuf" />
<PackageReference Include="Grpc.Tools" PrivateAssets="All" />
<Protobuf Include="..\Protos\messages.proto" Link="Protos\messages.proto" />
</ItemGroup>
```
You can send messages using the [```Microsoft.AutoGen.Agents.AgentWorker``` class](autogen/dotnet/src/Microsoft.AutoGen/Agents/AgentWorker.cs). Messages are wrapped in [the CloudEvents specification](https://cloudevents.io) and sent to the event bus.

View File

@ -0,0 +1,19 @@
{
"Logging": {
"LogLevel": {
"Default": "Warning",
"Microsoft.Hosting.Lifetime": "Information",
"Microsoft.AspNetCore": "Information",
"Microsoft": "Information",
"Microsoft.Orleans": "Warning",
"Orleans.Runtime": "Error",
"Grpc": "Information"
}
},
"AllowedHosts": "*",
"Kestrel": {
"EndpointDefaults": {
"Protocols": "Http2"
}
}
}

View File

@ -16,6 +16,6 @@
</ItemGroup>
<ItemGroup>
<ProjectReference Include="..\..\..\samples\Hello\HelloAgent\HelloAgent.csproj" />
<ProjectReference Include="../HelloAgentTests/HelloAgentTests.csproj" />
</ItemGroup>
</Project>

View File

@ -4,7 +4,7 @@
using Microsoft.Extensions.Hosting;
var appHost = DistributedApplication.CreateBuilder();
appHost.AddProject<Projects.HelloAgent>("HelloAgentsDotNetInMemoryRuntime");
appHost.AddProject<Projects.HelloAgentTests>("HelloAgentsDotNetInMemoryRuntime");
var app = appHost.Build();
await app.StartAsync();
await app.WaitForShutdownAsync();

View File

@ -0,0 +1,42 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Program.cs
using Aspire.Hosting.Python;
using Microsoft.Extensions.Hosting;
const string pythonHelloAgentPath = "../core_xlang_hello_python_agent";
const string pythonHelloAgentPy = "hello_python_agent.py";
const string pythonVEnv = "../../../../python/.venv";
//Environment.SetEnvironmentVariable("XLANG_TEST_NO_DOTNET", "true");
//Environment.SetEnvironmentVariable("XLANG_TEST_NO_PYTHON", "true");
var builder = DistributedApplication.CreateBuilder(args);
var backend = builder.AddProject<Projects.Microsoft_AutoGen_AgentHost>("AgentHost").WithExternalHttpEndpoints();
IResourceBuilder<ProjectResource>? dotnet = null;
#pragma warning disable ASPIREHOSTINGPYTHON001 // Type is for evaluation purposes only and is subject to change or removal in future updates. Suppress this diagnostic to proceed.
IResourceBuilder<PythonAppResource>? python = null;
if (string.IsNullOrEmpty(Environment.GetEnvironmentVariable("XLANG_TEST_NO_DOTNET")))
{
dotnet = builder.AddProject<Projects.HelloAgentTests>("HelloAgentTestsDotNET")
.WithReference(backend)
.WithEnvironment("AGENT_HOST", backend.GetEndpoint("https"))
.WithEnvironment("STAY_ALIVE_ON_GOODBYE", "true")
.WaitFor(backend);
}
if (string.IsNullOrEmpty(Environment.GetEnvironmentVariable("XLANG_TEST_NO_PYTHON")))
{
// xlang is over http for now - in prod use TLS between containers
python = builder.AddPythonApp("HelloAgentTestsPython", pythonHelloAgentPath, pythonHelloAgentPy, pythonVEnv)
.WithReference(backend)
.WithEnvironment("AGENT_HOST", backend.GetEndpoint("http"))
.WithEnvironment("STAY_ALIVE_ON_GOODBYE", "true")
.WithEnvironment("GRPC_DNS_RESOLVER", "native")
.WithOtlpExporter()
.WaitFor(backend);
if (dotnet != null) { python.WaitFor(dotnet); }
}
#pragma warning restore ASPIREHOSTINGPYTHON001 // Type is for evaluation purposes only and is subject to change or removal in future updates. Suppress this diagnostic to proceed.
using var app = builder.Build();
await app.StartAsync();
var url = backend.GetEndpoint("http").Url;
Console.WriteLine("Backend URL: " + url);
if (dotnet != null) { Console.WriteLine("Dotnet Resource Projects.HelloAgentTests invoked as HelloAgentTestsDotNET"); }
if (python != null) { Console.WriteLine("Python Resource hello_python_agent.py invoked as HelloAgentTestsPython"); }
await app.WaitForShutdownAsync();

View File

@ -0,0 +1,43 @@
{
"profiles": {
"https": {
"commandName": "Project",
"launchBrowser": true,
"dotnetRunMessages": true,
"applicationUrl": "https://localhost:15887;http://localhost:15888",
"environmentVariables": {
"ASPNETCORE_ENVIRONMENT": "Development",
"DOTNET_ENVIRONMENT": "Development",
//"DOTNET_DASHBOARD_OTLP_ENDPOINT_URL": "https://localhost:16037",
"DOTNET_DASHBOARD_OTLP_HTTP_ENDPOINT_URL": "https://localhost:16038",
"DOTNET_RESOURCE_SERVICE_ENDPOINT_URL": "https://localhost:17037",
"DOTNET_ASPIRE_SHOW_DASHBOARD_RESOURCES": "true"
}
},
"http": {
"commandName": "Project",
"launchBrowser": true,
"dotnetRunMessages": true,
"applicationUrl": "http://localhost:15888",
"environmentVariables": {
"ASPNETCORE_ENVIRONMENT": "Development",
"DOTNET_ENVIRONMENT": "Development",
//"DOTNET_DASHBOARD_OTLP_ENDPOINT_URL": "http://localhost:16031",
"DOTNET_DASHBOARD_OTLP_HTTP_ENDPOINT_URL": "http://localhost:16032",
"DOTNET_RESOURCE_SERVICE_ENDPOINT_URL": "http://localhost:17031",
"DOTNET_ASPIRE_SHOW_DASHBOARD_RESOURCES": "true",
"ASPIRE_ALLOW_UNSECURED_TRANSPORT": "true"
}
},
"generate-manifest": {
"commandName": "Project",
"dotnetRunMessages": true,
"commandLineArgs": "--publisher manifest --output-path aspire-manifest.json",
"environmentVariables": {
"ASPNETCORE_ENVIRONMENT": "Development",
"DOTNET_ENVIRONMENT": "Development"
}
}
},
"$schema": "https://json.schemastore.org/launchsettings.json"
}

View File

@ -0,0 +1,24 @@
<Project Sdk="Microsoft.NET.Sdk">
<Sdk Name="Aspire.AppHost.Sdk" Version="9.0.0" />
<PropertyGroup>
<OutputType>Exe</OutputType>
<TargetFramework>net8.0</TargetFramework>
<ImplicitUsings>enable</ImplicitUsings>
<Nullable>enable</Nullable>
<IsAspireHost>true</IsAspireHost>
<UserSecretsId>ecb5cbe4-15d8-4120-8f18-d3ba4902915b</UserSecretsId>
</PropertyGroup>
<ItemGroup>
<PackageReference Include="Aspire.Hosting.AppHost" />
<PackageReference Include="Aspire.Hosting" />
<PackageReference Include="Aspire.Hosting.Python" />
</ItemGroup>
<ItemGroup>
<ProjectReference Include="../../../src/Microsoft.AutoGen/AgentHost/Microsoft.AutoGen.AgentHost.csproj" />
<ProjectReference Include="../HelloAgentTests/HelloAgentTests.csproj" />
</ItemGroup>
</Project>

View File

@ -0,0 +1,14 @@
# Python and dotnet agents interoperability sample
This sample demonstrates how to create a Python agent that interacts with a .NET agent.
To run the sample, check out the autogen repository.
Then do the following:
1. Navigate to autogen/dotnet/samples/Hello/Hello.AppHost
2. Run `dotnet run` to start the .NET Aspire app host, which runs three projects:
- Backend (the .NET Agent Runtime)
- HelloAgent (the .NET Agent)
- this Python agent - hello_python_agent.py
3. The AppHost will start the Aspire dashboard on [https://localhost:15887](https://localhost:15887).
The Python agent will interact with the .NET agent by sending a message to the .NET runtime, which will relay the message to the .NET agent.

View File

@ -0,0 +1,75 @@
import asyncio
import logging
import os
import sys
# from protos.agents_events_pb2 import NewMessageReceived
from autogen_core import (
PROTOBUF_DATA_CONTENT_TYPE,
AgentId,
DefaultSubscription,
DefaultTopicId,
TypeSubscription,
try_get_known_serializers_for_type,
)
from autogen_ext.runtimes.grpc import GrpcWorkerAgentRuntime
# Add the local package directory to sys.path
thisdir = os.path.dirname(os.path.abspath(__file__))
sys.path.append(os.path.join(thisdir, "..", ".."))
from dotenv import load_dotenv # type: ignore # noqa: E402
from protos.agent_events_pb2 import NewMessageReceived, Output # type: ignore # noqa: E402
from user_input import UserProxy # type: ignore # noqa: E402
agnext_logger = logging.getLogger("autogen_core")
async def main() -> None:
load_dotenv()
agentHost = os.getenv("AGENT_HOST") or "http://localhost:50673"
# grpc python bug - can only use the hostname, not prefix - if hostname has a prefix we have to remove it:
if agentHost.startswith("http://"):
agentHost = agentHost[7:]
if agentHost.startswith("https://"):
agentHost = agentHost[8:]
agnext_logger.info("0")
agnext_logger.info(agentHost)
runtime = GrpcWorkerAgentRuntime(host_address=agentHost, payload_serialization_format=PROTOBUF_DATA_CONTENT_TYPE)
agnext_logger.info("1")
await runtime.start()
runtime.add_message_serializer(try_get_known_serializers_for_type(NewMessageReceived))
agnext_logger.info("2")
await UserProxy.register(runtime, "HelloAgent", lambda: UserProxy())
await runtime.add_subscription(DefaultSubscription(agent_type="HelloAgent"))
await runtime.add_subscription(TypeSubscription(topic_type="HelloTopic", agent_type="HelloAgent"))
await runtime.add_subscription(TypeSubscription(topic_type="agents.NewMessageReceived", agent_type="HelloAgent"))
await runtime.add_subscription(TypeSubscription(topic_type="agents.ConversationClosed", agent_type="HelloAgent"))
await runtime.add_subscription(TypeSubscription(topic_type="agents.Output", agent_type="HelloAgent"))
agnext_logger.info("3")
new_message = NewMessageReceived(message="Hello from Python!")
output_message = Output(message="^v^v^v---Wild Hello from Python!---^v^v^v")
await runtime.publish_message(
message=new_message,
topic_id=DefaultTopicId("HelloTopic", "HelloAgents/python"),
sender=AgentId("HelloAgents", "python"),
)
runtime.add_message_serializer(try_get_known_serializers_for_type(Output))
await runtime.publish_message(
message=output_message,
topic_id=DefaultTopicId("HelloTopic", "HelloAgents/python"),
sender=AgentId("HelloAgents", "python"),
)
await runtime.stop_when_signal()
# await runtime.stop_when_idle()
if __name__ == "__main__":
logging.basicConfig(level=logging.DEBUG)
agnext_logger.setLevel(logging.DEBUG)
agnext_logger.log(logging.DEBUG, "Starting worker")
asyncio.run(main())

View File

@ -0,0 +1,8 @@
"""
The :mod:`autogen_core.worker.protos` module provides Google Protobuf classes for agent-worker communication
"""
import os
import sys
sys.path.insert(0, os.path.abspath(os.path.dirname(__file__)))

View File

@ -0,0 +1,59 @@
# -*- coding: utf-8 -*-
# Generated by the protocol buffer compiler. DO NOT EDIT!
# NO CHECKED-IN PROTOBUF GENCODE
# source: agent_events.proto
# Protobuf Python Version: 5.29.0
"""Generated protocol buffer code."""
from google.protobuf import descriptor as _descriptor
from google.protobuf import descriptor_pool as _descriptor_pool
from google.protobuf import runtime_version as _runtime_version
from google.protobuf import symbol_database as _symbol_database
from google.protobuf.internal import builder as _builder
_runtime_version.ValidateProtobufRuntimeVersion(
_runtime_version.Domain.PUBLIC,
5,
29,
0,
'',
'agent_events.proto'
)
# @@protoc_insertion_point(imports)
_sym_db = _symbol_database.Default()
DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\x12\x61gent_events.proto\x12\x06\x61gents\"2\n\x0bTextMessage\x12\x13\n\x0btextMessage\x18\x01 \x01(\t\x12\x0e\n\x06source\x18\x02 \x01(\t\"\x18\n\x05Input\x12\x0f\n\x07message\x18\x01 \x01(\t\"\x1f\n\x0eInputProcessed\x12\r\n\x05route\x18\x01 \x01(\t\"\x19\n\x06Output\x12\x0f\n\x07message\x18\x01 \x01(\t\"\x1e\n\rOutputWritten\x12\r\n\x05route\x18\x01 \x01(\t\"\x1a\n\x07IOError\x12\x0f\n\x07message\x18\x01 \x01(\t\"%\n\x12NewMessageReceived\x12\x0f\n\x07message\x18\x01 \x01(\t\"%\n\x11ResponseGenerated\x12\x10\n\x08response\x18\x01 \x01(\t\"\x1a\n\x07GoodBye\x12\x0f\n\x07message\x18\x01 \x01(\t\" \n\rMessageStored\x12\x0f\n\x07message\x18\x01 \x01(\t\";\n\x12\x43onversationClosed\x12\x0f\n\x07user_id\x18\x01 \x01(\t\x12\x14\n\x0cuser_message\x18\x02 \x01(\t\"\x1b\n\x08Shutdown\x12\x0f\n\x07message\x18\x01 \x01(\tB\x1b\xaa\x02\x18Microsoft.AutoGen.Agentsb\x06proto3')
_globals = globals()
_builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, _globals)
_builder.BuildTopDescriptorsAndMessages(DESCRIPTOR, 'agent_events_pb2', _globals)
if not _descriptor._USE_C_DESCRIPTORS:
_globals['DESCRIPTOR']._loaded_options = None
_globals['DESCRIPTOR']._serialized_options = b'\252\002\030Microsoft.AutoGen.Agents'
_globals['_TEXTMESSAGE']._serialized_start=30
_globals['_TEXTMESSAGE']._serialized_end=80
_globals['_INPUT']._serialized_start=82
_globals['_INPUT']._serialized_end=106
_globals['_INPUTPROCESSED']._serialized_start=108
_globals['_INPUTPROCESSED']._serialized_end=139
_globals['_OUTPUT']._serialized_start=141
_globals['_OUTPUT']._serialized_end=166
_globals['_OUTPUTWRITTEN']._serialized_start=168
_globals['_OUTPUTWRITTEN']._serialized_end=198
_globals['_IOERROR']._serialized_start=200
_globals['_IOERROR']._serialized_end=226
_globals['_NEWMESSAGERECEIVED']._serialized_start=228
_globals['_NEWMESSAGERECEIVED']._serialized_end=265
_globals['_RESPONSEGENERATED']._serialized_start=267
_globals['_RESPONSEGENERATED']._serialized_end=304
_globals['_GOODBYE']._serialized_start=306
_globals['_GOODBYE']._serialized_end=332
_globals['_MESSAGESTORED']._serialized_start=334
_globals['_MESSAGESTORED']._serialized_end=366
_globals['_CONVERSATIONCLOSED']._serialized_start=368
_globals['_CONVERSATIONCLOSED']._serialized_end=427
_globals['_SHUTDOWN']._serialized_start=429
_globals['_SHUTDOWN']._serialized_end=456
# @@protoc_insertion_point(module_scope)

View File

@ -0,0 +1,197 @@
"""
@generated by mypy-protobuf. Do not edit manually!
isort:skip_file
"""
import builtins
import google.protobuf.descriptor
import google.protobuf.message
import typing
DESCRIPTOR: google.protobuf.descriptor.FileDescriptor
@typing.final
class TextMessage(google.protobuf.message.Message):
DESCRIPTOR: google.protobuf.descriptor.Descriptor
TEXTMESSAGE_FIELD_NUMBER: builtins.int
SOURCE_FIELD_NUMBER: builtins.int
textMessage: builtins.str
source: builtins.str
def __init__(
self,
*,
textMessage: builtins.str = ...,
source: builtins.str = ...,
) -> None: ...
def ClearField(self, field_name: typing.Literal["source", b"source", "textMessage", b"textMessage"]) -> None: ...
global___TextMessage = TextMessage
@typing.final
class Input(google.protobuf.message.Message):
DESCRIPTOR: google.protobuf.descriptor.Descriptor
MESSAGE_FIELD_NUMBER: builtins.int
message: builtins.str
def __init__(
self,
*,
message: builtins.str = ...,
) -> None: ...
def ClearField(self, field_name: typing.Literal["message", b"message"]) -> None: ...
global___Input = Input
@typing.final
class InputProcessed(google.protobuf.message.Message):
DESCRIPTOR: google.protobuf.descriptor.Descriptor
ROUTE_FIELD_NUMBER: builtins.int
route: builtins.str
def __init__(
self,
*,
route: builtins.str = ...,
) -> None: ...
def ClearField(self, field_name: typing.Literal["route", b"route"]) -> None: ...
global___InputProcessed = InputProcessed
@typing.final
class Output(google.protobuf.message.Message):
DESCRIPTOR: google.protobuf.descriptor.Descriptor
MESSAGE_FIELD_NUMBER: builtins.int
message: builtins.str
def __init__(
self,
*,
message: builtins.str = ...,
) -> None: ...
def ClearField(self, field_name: typing.Literal["message", b"message"]) -> None: ...
global___Output = Output
@typing.final
class OutputWritten(google.protobuf.message.Message):
DESCRIPTOR: google.protobuf.descriptor.Descriptor
ROUTE_FIELD_NUMBER: builtins.int
route: builtins.str
def __init__(
self,
*,
route: builtins.str = ...,
) -> None: ...
def ClearField(self, field_name: typing.Literal["route", b"route"]) -> None: ...
global___OutputWritten = OutputWritten
@typing.final
class IOError(google.protobuf.message.Message):
DESCRIPTOR: google.protobuf.descriptor.Descriptor
MESSAGE_FIELD_NUMBER: builtins.int
message: builtins.str
def __init__(
self,
*,
message: builtins.str = ...,
) -> None: ...
def ClearField(self, field_name: typing.Literal["message", b"message"]) -> None: ...
global___IOError = IOError
@typing.final
class NewMessageReceived(google.protobuf.message.Message):
DESCRIPTOR: google.protobuf.descriptor.Descriptor
MESSAGE_FIELD_NUMBER: builtins.int
message: builtins.str
def __init__(
self,
*,
message: builtins.str = ...,
) -> None: ...
def ClearField(self, field_name: typing.Literal["message", b"message"]) -> None: ...
global___NewMessageReceived = NewMessageReceived
@typing.final
class ResponseGenerated(google.protobuf.message.Message):
DESCRIPTOR: google.protobuf.descriptor.Descriptor
RESPONSE_FIELD_NUMBER: builtins.int
response: builtins.str
def __init__(
self,
*,
response: builtins.str = ...,
) -> None: ...
def ClearField(self, field_name: typing.Literal["response", b"response"]) -> None: ...
global___ResponseGenerated = ResponseGenerated
@typing.final
class GoodBye(google.protobuf.message.Message):
DESCRIPTOR: google.protobuf.descriptor.Descriptor
MESSAGE_FIELD_NUMBER: builtins.int
message: builtins.str
def __init__(
self,
*,
message: builtins.str = ...,
) -> None: ...
def ClearField(self, field_name: typing.Literal["message", b"message"]) -> None: ...
global___GoodBye = GoodBye
@typing.final
class MessageStored(google.protobuf.message.Message):
DESCRIPTOR: google.protobuf.descriptor.Descriptor
MESSAGE_FIELD_NUMBER: builtins.int
message: builtins.str
def __init__(
self,
*,
message: builtins.str = ...,
) -> None: ...
def ClearField(self, field_name: typing.Literal["message", b"message"]) -> None: ...
global___MessageStored = MessageStored
@typing.final
class ConversationClosed(google.protobuf.message.Message):
DESCRIPTOR: google.protobuf.descriptor.Descriptor
USER_ID_FIELD_NUMBER: builtins.int
USER_MESSAGE_FIELD_NUMBER: builtins.int
user_id: builtins.str
user_message: builtins.str
def __init__(
self,
*,
user_id: builtins.str = ...,
user_message: builtins.str = ...,
) -> None: ...
def ClearField(self, field_name: typing.Literal["user_id", b"user_id", "user_message", b"user_message"]) -> None: ...
global___ConversationClosed = ConversationClosed
@typing.final
class Shutdown(google.protobuf.message.Message):
DESCRIPTOR: google.protobuf.descriptor.Descriptor
MESSAGE_FIELD_NUMBER: builtins.int
message: builtins.str
def __init__(
self,
*,
message: builtins.str = ...,
) -> None: ...
def ClearField(self, field_name: typing.Literal["message", b"message"]) -> None: ...
global___Shutdown = Shutdown

View File

@ -0,0 +1,24 @@
# Generated by the gRPC Python protocol compiler plugin. DO NOT EDIT!
"""Client and server classes corresponding to protobuf-defined services."""
import grpc
import warnings
GRPC_GENERATED_VERSION = '1.70.0'
GRPC_VERSION = grpc.__version__
_version_not_supported = False
try:
from grpc._utilities import first_version_is_lower
_version_not_supported = first_version_is_lower(GRPC_VERSION, GRPC_GENERATED_VERSION)
except ImportError:
_version_not_supported = True
if _version_not_supported:
raise RuntimeError(
f'The grpc package installed is at version {GRPC_VERSION},'
+ f' but the generated code in agent_events_pb2_grpc.py depends on'
+ f' grpcio>={GRPC_GENERATED_VERSION}.'
+ f' Please upgrade your grpc module to grpcio>={GRPC_GENERATED_VERSION}'
+ f' or downgrade your generated code using grpcio-tools<={GRPC_VERSION}.'
)

View File

@ -0,0 +1,17 @@
"""
@generated by mypy-protobuf. Do not edit manually!
isort:skip_file
"""
import abc
import collections.abc
import grpc
import grpc.aio
import typing
_T = typing.TypeVar("_T")
class _MaybeAsyncIterator(collections.abc.AsyncIterator[_T], collections.abc.Iterator[_T], metaclass=abc.ABCMeta): ...
class _ServicerContext(grpc.ServicerContext, grpc.aio.ServicerContext): # type: ignore[misc, type-arg]
...

View File

@ -0,0 +1,38 @@
import asyncio
import logging
from typing import Union
from autogen_core import DefaultTopicId, MessageContext, RoutedAgent, message_handler
from protos.agent_events_pb2 import ConversationClosed, Input, NewMessageReceived, Output # type: ignore
input_types = Union[ConversationClosed, Input, Output]
class UserProxy(RoutedAgent):
"""An agent that allows the user to play the role of an agent in the conversation via input."""
DEFAULT_DESCRIPTION = "A human user."
def __init__(
self,
description: str = DEFAULT_DESCRIPTION,
) -> None:
super().__init__(description)
@message_handler
async def handle_user_chat_input(self, message: input_types, ctx: MessageContext) -> None:
logger = logging.getLogger("autogen_core")
if isinstance(message, Input):
response = await self.ainput("User input ('exit' to quit): ")
response = response.strip()
logger.info(response)
await self.publish_message(NewMessageReceived(message=response), topic_id=DefaultTopicId())
elif isinstance(message, Output):
logger.info(message.message)
else:
pass
async def ainput(self, prompt: str) -> str:
return await asyncio.to_thread(input, f"{prompt} ")

View File

@ -0,0 +1,43 @@
syntax = "proto3";
package agents;
option csharp_namespace = "Microsoft.AutoGen.Contracts";
message TextMessage {
string textMessage = 1;
string source = 2;
}
message Input {
string message = 1;
}
message InputProcessed {
string route = 1;
}
message Output {
string message = 1;
}
message OutputWritten {
string route = 1;
}
message IOError {
string message = 1;
}
message NewMessageReceived {
string message = 1;
}
message ResponseGenerated {
string response = 1;
}
message GoodBye {
string message = 1;
}
message MessageStored {
string message = 1;
}
message ConversationClosed {
string user_id = 1;
string user_message = 2;
}
message Shutdown {
string message = 1;
}

View File

@ -8,10 +8,15 @@ namespace Microsoft.AutoGen.Integration.Tests;
public class HelloAppHostIntegrationTests(ITestOutputHelper testOutput)
{
[Theory, Trait("Category", "Integration")]
[MemberData(nameof(AppHostAssemblies))]
public async Task AppHostRunsCleanly(string appHostPath)
private const string AppHostAssemblyName = "XlangTests.AppHost";
private const string DotNetResourceName = "HelloAgentTestsDotNET";
private const string PythonResourceName = "HelloAgentTestsPython";
private const string BackendResourceName = "AgentHost";
[Fact]
public async Task AppHostRunsCleanly()
{
var appHostPath = GetAssemblyPath(AppHostAssemblyName);
var appHost = await DistributedApplicationTestFactory.CreateAsync(appHostPath, testOutput);
await using var app = await appHost.BuildAsync().WaitAsync(TimeSpan.FromSeconds(15));
@ -22,53 +27,157 @@ public class HelloAppHostIntegrationTests(ITestOutputHelper testOutput)
await app.StopAsync().WaitAsync(TimeSpan.FromSeconds(15));
}
[Theory, Trait("Category", "Integration")]
[MemberData(nameof(TestEndpoints))]
public async Task AppHostLogsHelloAgentE2E(TestEndpoints testEndpoints)
[Fact]
public async Task Test_Dotnet_Sends_AgentHost_Delivers_and_Python_Receives()
{
var appHostName = testEndpoints.AppHost!;
var appHostPath = $"{appHostName}.dll";
//Prepare
var appHostPath = GetAssemblyPath(AppHostAssemblyName);
var appHost = await DistributedApplicationTestFactory.CreateAsync(appHostPath, testOutput);
await using var app = await appHost.BuildAsync().WaitAsync(TimeSpan.FromSeconds(15));
await app.StartAsync().WaitAsync(TimeSpan.FromSeconds(120));
await app.WaitForResourcesAsync().WaitAsync(TimeSpan.FromSeconds(120));
if (testEndpoints.WaitForResources?.Count > 0)
await app.WaitForResourcesAsync(new[] { KnownResourceStates.Running }).WaitAsync(TimeSpan.FromSeconds(120));
//Act
var expectedMessage = "INFO:autogen_core:Received a message from host: cloudEvent {";
var containsExpectedMessage = false;
app.EnsureNoErrorsLogged();
containsExpectedMessage = await app.WaitForExpectedMessageInResourceLogs(PythonResourceName, expectedMessage, TimeSpan.FromSeconds(120));
expectedMessage = "Hello World!";
containsExpectedMessage = false;
containsExpectedMessage = await app.WaitForExpectedMessageInResourceLogs(PythonResourceName, expectedMessage, TimeSpan.FromSeconds(120));
await app.StopAsync();
//Assert
Assert.True(containsExpectedMessage);
}
[Fact]
public async Task Test_Python_Sends_AgentHost_Delivers_and_DotNet_Receives()
{
//Prepare
var appHostPath = GetAssemblyPath(AppHostAssemblyName);
var appHost = await DistributedApplicationTestFactory.CreateAsync(appHostPath, testOutput);
await using var app = await appHost.BuildAsync().WaitAsync(TimeSpan.FromSeconds(15));
await app.StartAsync().WaitAsync(TimeSpan.FromSeconds(120));
await app.WaitForResourcesAsync(new[] { KnownResourceStates.Running }).WaitAsync(TimeSpan.FromSeconds(120));
//Act
var expectedMessage = "from Python!";
var containsExpectedMessage = false;
app.EnsureNoErrorsLogged();
containsExpectedMessage = await app.WaitForExpectedMessageInResourceLogs(DotNetResourceName, expectedMessage, TimeSpan.FromSeconds(60));
await app.StopAsync();
//Assert
Assert.True(containsExpectedMessage);
}
[Fact]
public async Task Test_Python_Agent_Sends_And_AgentHost_Receives()
{
//Prepare
Environment.SetEnvironmentVariable("XLANG_TEST_NO_DOTNET", "true");
var appHostPath = GetAssemblyPath(AppHostAssemblyName);
var appHost = await DistributedApplicationTestFactory.CreateAsync(appHostPath, testOutput);
await using var app = await appHost.BuildAsync().WaitAsync(TimeSpan.FromSeconds(15));
await app.StartAsync().WaitAsync(TimeSpan.FromSeconds(120));
await app.WaitForResourcesAsync(new[] { KnownResourceStates.Running }).WaitAsync(TimeSpan.FromSeconds(120));
//Act
var expectedMessage = "\"source\": \"HelloAgents/python\"";
var containsExpectedMessage = false;
app.EnsureNoErrorsLogged();
containsExpectedMessage = await app.WaitForExpectedMessageInResourceLogs(BackendResourceName, expectedMessage, TimeSpan.FromSeconds(120));
await app.StopAsync();
//Assert
Assert.True(containsExpectedMessage);
}
[Fact]
public async Task Test_Dotnet_Agent_Sends_And_AgentHost_Receives()
{
//Prepare
Environment.SetEnvironmentVariable("XLANG_TEST_NO_PYTHON", "true");
var appHostPath = GetAssemblyPath(AppHostAssemblyName);
var appHost = await DistributedApplicationTestFactory.CreateAsync(appHostPath, testOutput);
await using var app = await appHost.BuildAsync().WaitAsync(TimeSpan.FromSeconds(15));
await app.StartAsync().WaitAsync(TimeSpan.FromSeconds(120));
await app.WaitForResourcesAsync(new[] { KnownResourceStates.Running }).WaitAsync(TimeSpan.FromSeconds(120));
//Act
var expectedMessage = "\"source\": \"HelloAgents/dotnet\"";
var containsExpectedMessage = false;
app.EnsureNoErrorsLogged();
containsExpectedMessage = await app.WaitForExpectedMessageInResourceLogs(BackendResourceName, expectedMessage, TimeSpan.FromSeconds(120));
await app.StopAsync();
//Assert
Assert.True(containsExpectedMessage);
}
[Fact]
public async Task Test_Dotnet_Agent_Sends_And_AgentHost_Delivers_Back_To_It()
{
//Prepare
Environment.SetEnvironmentVariable("XLANG_TEST_NO_PYTHON", "true");
var appHostPath = GetAssemblyPath(AppHostAssemblyName);
var appHost = await DistributedApplicationTestFactory.CreateAsync(appHostPath, testOutput);
await using var app = await appHost.BuildAsync().WaitAsync(TimeSpan.FromSeconds(15));
await app.StartAsync().WaitAsync(TimeSpan.FromSeconds(120));
await app.WaitForResourcesAsync(new[] { KnownResourceStates.Running }).WaitAsync(TimeSpan.FromSeconds(120));
//Act
var expectedMessage = "Hello World!";
var containsExpectedMessage = false;
app.EnsureNoErrorsLogged();
containsExpectedMessage = await app.WaitForExpectedMessageInResourceLogs(DotNetResourceName, expectedMessage, TimeSpan.FromSeconds(120));
expectedMessage = "HelloAgent said Goodbye";
containsExpectedMessage = false;
containsExpectedMessage = await app.WaitForExpectedMessageInResourceLogs(DotNetResourceName, expectedMessage, TimeSpan.FromSeconds(120));
await app.StopAsync();
//Assert
Assert.True(containsExpectedMessage);
}
[Fact]
public async Task Test_Python_Agent_Sends_And_AgentHost_Delivers_Back_To_It()
{
//Prepare
Environment.SetEnvironmentVariable("XLANG_TEST_NO_DOTNET", "true");
var appHostPath = GetAssemblyPath(AppHostAssemblyName);
var appHost = await DistributedApplicationTestFactory.CreateAsync(appHostPath, testOutput);
await using var app = await appHost.BuildAsync().WaitAsync(TimeSpan.FromSeconds(15));
await app.StartAsync().WaitAsync(TimeSpan.FromSeconds(120));
await app.WaitForResourcesAsync(new[] { KnownResourceStates.Running }).WaitAsync(TimeSpan.FromSeconds(120));
//Act
var expectedMessage = "INFO:autogen_core:Received a message from host: cloudEvent {";
var containsExpectedMessage = false;
app.EnsureNoErrorsLogged();
containsExpectedMessage = await app.WaitForExpectedMessageInResourceLogs(PythonResourceName, expectedMessage, TimeSpan.FromSeconds(120));
await app.StopAsync();
//Assert
Assert.True(containsExpectedMessage);
}
private static string GetAssemblyPath(string assemblyName)
{
var parentDir = Directory.GetParent(AppContext.BaseDirectory)?.FullName;
var grandParentDir = parentDir is not null ? Directory.GetParent(parentDir)?.FullName : null;
var greatGrandParentDir = grandParentDir is not null ? Directory.GetParent(grandParentDir)?.FullName : null
?? AppContext.BaseDirectory;
var options = new EnumerationOptions { RecurseSubdirectories = true, MatchCasing = MatchCasing.CaseInsensitive };
if (greatGrandParentDir is not null)
{
// Wait until each resource transitions to the required state
var timeout = TimeSpan.FromMinutes(5);
foreach (var (ResourceName, TargetState) in testEndpoints.WaitForResources)
var foundFile = Directory.GetFiles(greatGrandParentDir, $"{assemblyName}.dll", options).FirstOrDefault();
if (foundFile is not null)
{
await app.WaitForResource(ResourceName, TargetState).WaitAsync(timeout);
return foundFile;
}
}
//sleep to make sure the app is running
await Task.Delay(20000);
app.EnsureNoErrorsLogged();
app.EnsureLogContains("HelloAgent said Goodbye");
app.EnsureLogContains("Wild Hello from Python!");
await app.StopAsync().WaitAsync(TimeSpan.FromSeconds(15));
}
public static TheoryData<string> AppHostAssemblies()
{
var appHostAssemblies = GetSamplesAppHostAssemblyPaths();
var theoryData = new TheoryData<string, bool>();
return new(appHostAssemblies.Select(p => Path.GetRelativePath(AppContext.BaseDirectory, p)));
}
public static TheoryData<TestEndpoints> TestEndpoints() =>
new([
new TestEndpoints("Hello.AppHost", new() {
{ "backend", ["/"] }
}),
]);
private static IEnumerable<string> GetSamplesAppHostAssemblyPaths()
{
// All the AppHost projects are referenced by this project so we can find them by looking for all their assemblies in the base directory
return Directory.GetFiles(AppContext.BaseDirectory, "*.AppHost.dll")
.Where(fileName => !fileName.EndsWith("Aspire.Hosting.AppHost.dll", StringComparison.OrdinalIgnoreCase));
throw new FileNotFoundException($"Could not find {assemblyName}.dll in {grandParentDir ?? "unknown"} directory");
}
}

View File

@ -6,38 +6,26 @@ namespace Microsoft.AutoGen.Integration.Tests;
public class InMemoryRuntimeIntegrationTests(ITestOutputHelper testOutput)
{
[Theory, Trait("Category", "Integration")]
[MemberData(nameof(AppHostAssemblies))]
public async Task HelloAgentsE2EInMemory(string appHostAssemblyPath)
[Fact]
public async Task HelloAgentsE2EInMemory()
{
// Locate InMemoryTests.AppHost.dll in the test output folder
var appHostAssemblyPath = Directory.GetFiles(AppContext.BaseDirectory, "InMemoryTests.AppHost.dll", SearchOption.AllDirectories)
.FirstOrDefault()
?? throw new FileNotFoundException("Could not find InMemoryTests.AppHost.dll in the test output folder");
var appHost = await DistributedApplicationTestFactory.CreateAsync(appHostAssemblyPath, testOutput);
await using var app = await appHost.BuildAsync().WaitAsync(TimeSpan.FromSeconds(15));
// Start the application and wait for resources
await app.StartAsync().WaitAsync(TimeSpan.FromSeconds(120));
await app.WaitForResourcesAsync().WaitAsync(TimeSpan.FromSeconds(120));
await app.StartAsync().WaitAsync(TimeSpan.FromSeconds(120));
await app.WaitForResourcesAsync().WaitAsync(TimeSpan.FromSeconds(120));
//sleep 5 seconds to make sure the app is running
await Task.Delay(15000);
// Sleep 5 seconds to ensure the app is up
await Task.Delay(5000);
app.EnsureNoErrorsLogged();
app.EnsureLogContains("Hello World");
app.EnsureLogContains("HelloAgent said Goodbye");
await app.StopAsync().WaitAsync(TimeSpan.FromSeconds(15));
}
public static TheoryData<string> AppHostAssemblies()
{
var appHostAssemblies = GetSamplesAppHostAssemblyPaths();
var theoryData = new TheoryData<string, bool>();
return new(appHostAssemblies.Select(p => Path.GetRelativePath(AppContext.BaseDirectory, p)));
}
private static IEnumerable<string> GetSamplesAppHostAssemblyPaths()
{
// All the AppHost projects are referenced by this project so we can find them by looking for all their assemblies in the base directory
return Directory.GetFiles(AppContext.BaseDirectory, "HelloAgent.AppHost.dll")
.Where(fileName => !fileName.EndsWith("Aspire.Hosting.AppHost.dll", StringComparison.OrdinalIgnoreCase));
}
}

View File

@ -1,8 +1,8 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// DistributedApplicationExtension.cs
using System.Diagnostics;
using System.Security.Cryptography;
using Aspire.Hosting;
using Aspire.Hosting.Python;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
@ -185,6 +185,19 @@ public static partial class DistributedApplicationExtensions
return (appHostLogs, resourceLogs);
}
/// <summary>
/// Gets the logs for the specified resource.
/// </summary>
/// <param name="app">The DistributedApplication</param>
/// <param name="resourceName">The name of the resource</param>
/// <returns>List<FakeLogRecord></returns>
public static IReadOnlyList<FakeLogRecord> GetResourceLogs(this DistributedApplication app, string resourceName)
{
var environment = app.Services.GetRequiredService<IHostEnvironment>();
var logCollector = app.Services.GetFakeLogCollector();
return logCollector.GetSnapshot().Where(l => l.Category == $"{environment.ApplicationName}.Resources.{resourceName}").ToList();
}
/// <summary>
/// Get all logs from the whole test run.
/// </summary>
@ -240,6 +253,58 @@ public static partial class DistributedApplicationExtensions
Assert.Contains(resourceLogs, log => log.Message.Contains(message));
}
/// <summary>
/// WaitForExpectedMessageInLogs
/// </summary>
/// <param name="app">DistributedApplication</param>
/// <param name="expectedMessage">string</param>
/// <param name="timeout">TimeSpan</param>
public static async Task<bool> WaitForExpectedMessageInResourceLogs(this DistributedApplication app, string resourceName, string expectedMessage, TimeSpan timeout)
{
var containsExpectedMessage = false;
var logWatchCancellation = new CancellationTokenSource();
var logWatchTask = Task.Run(async () =>
{
while (!containsExpectedMessage)
{
var logs = app.GetResourceLogs(resourceName);
if (logs != null && logs.Any(log => log.Message.Contains(expectedMessage)))
{
containsExpectedMessage = true;
logWatchCancellation.Cancel();
}
}
}, logWatchCancellation.Token).WaitAsync(timeout);
try
{
await logWatchTask.ConfigureAwait(true);
}
catch (OperationCanceledException)
{
// Task was cancelled, which means the expected message was found
}
catch (Exception ex)
{
if (Debugger.IsAttached)
{
var logs = app.GetResourceLogs(resourceName);
foreach (var log in logs)
{
Console.WriteLine(log.Message);
}
var environment = app.Services.GetRequiredService<IHostEnvironment>();
var logCollector = app.Services.GetFakeLogCollector();
var allLogs = logCollector.GetSnapshot();
}
throw new Exception($"Failed to find expected message '{expectedMessage}' in logs for resource '{resourceName}' within the timeout period.", ex);
}
finally
{
logWatchCancellation.Cancel();
}
return containsExpectedMessage;
}
/// <summary>
/// Creates an <see cref="HttpClient"/> configured to communicate with the specified resource.
/// </summary>

View File

@ -26,11 +26,16 @@
<Using Include="Aspire.Hosting.Testing" />
<Using Include="Xunit" />
</ItemGroup>
<ItemGroup>
<PackageReference Include="Aspire.Hosting.AppHost" />
<PackageReference Include="Aspire.Hosting" />
<PackageReference Include="Aspire.Hosting.Python" />
</ItemGroup>
<ItemGroup>
<ProjectReference Include="..\..\samples\Hello\Hello.AppHost\Hello.AppHost.csproj" />
<ProjectReference Include="..\..\samples\Hello\HelloAgent\HelloAgent.csproj" />
<ProjectReference Include="..\Microsoft.AutoGen.Integration.Tests.AppHosts\HelloAgent.AppHost\HelloAgent.AppHost.csproj" />
<ProjectReference Include="../../src/Microsoft.AutoGen/AgentHost/Microsoft.AutoGen.AgentHost.csproj" />
<ProjectReference Include="../Microsoft.AutoGen.Integration.Tests.AppHosts/HelloAgentTests/HelloAgentTests.csproj" />
<ProjectReference Include="../Microsoft.AutoGen.Integration.Tests.AppHosts/InMemoryTests.AppHost/InMemoryTests.AppHost.csproj" />
</ItemGroup>
<!-- Properties, Items, and targets to ensure Python apps are initialized -->

View File

@ -0,0 +1,43 @@
{
"profiles": {
"https": {
"commandName": "Project",
"launchBrowser": true,
"dotnetRunMessages": true,
"applicationUrl": "https://localhost:15887;http://localhost:15888",
"environmentVariables": {
"ASPNETCORE_ENVIRONMENT": "Development",
"DOTNET_ENVIRONMENT": "Development",
//"DOTNET_DASHBOARD_OTLP_ENDPOINT_URL": "https://localhost:16037",
"DOTNET_DASHBOARD_OTLP_HTTP_ENDPOINT_URL": "https://localhost:16038",
"DOTNET_RESOURCE_SERVICE_ENDPOINT_URL": "https://localhost:17037",
"DOTNET_ASPIRE_SHOW_DASHBOARD_RESOURCES": "true"
}
},
"http": {
"commandName": "Project",
"launchBrowser": true,
"dotnetRunMessages": true,
"applicationUrl": "http://localhost:15888",
"environmentVariables": {
"ASPNETCORE_ENVIRONMENT": "Development",
"DOTNET_ENVIRONMENT": "Development",
//"DOTNET_DASHBOARD_OTLP_ENDPOINT_URL": "http://localhost:16031",
"DOTNET_DASHBOARD_OTLP_HTTP_ENDPOINT_URL": "http://localhost:16032",
"DOTNET_RESOURCE_SERVICE_ENDPOINT_URL": "http://localhost:17031",
"DOTNET_ASPIRE_SHOW_DASHBOARD_RESOURCES": "true",
"ASPIRE_ALLOW_UNSECURED_TRANSPORT": "true"
}
},
"generate-manifest": {
"commandName": "Project",
"dotnetRunMessages": true,
"commandLineArgs": "--publisher manifest --output-path aspire-manifest.json",
"environmentVariables": {
"ASPNETCORE_ENVIRONMENT": "Development",
"DOTNET_ENVIRONMENT": "Development"
}
}
},
"$schema": "https://json.schemastore.org/launchsettings.json"
}

View File

@ -0,0 +1,40 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// MessageRegistryTests.cs
using Microsoft.AutoGen.Contracts;
using Microsoft.AutoGen.RuntimeGateway.Grpc.Abstractions;
using Microsoft.AutoGen.RuntimeGateway.Grpc.Tests.Helpers.Orleans;
using Orleans.TestingHost;
namespace Microsoft.AutoGen.RuntimeGateway.Grpc.Tests;
public class MessageRegistryTests : IClassFixture<ClusterFixture>
{
private readonly TestCluster _cluster;
public MessageRegistryTests(ClusterFixture fixture)
{
_cluster = fixture.Cluster;
}
[Fact]
public async Task Write_and_Remove_Messages()
{
// Arrange
var grain = _cluster.GrainFactory.GetGrain<IMessageRegistryGrain>(0);
var topic = Guid.NewGuid().ToString(); // Random topic
var message = new CloudEvent { Id = Guid.NewGuid().ToString(), Source = "test-source", Type = "test-type" };
// Act
await grain.WriteMessageAsync(topic, message);
// Assert
// attempt to remove the topic from the queue
var removedMessages = await grain.RemoveMessagesAsync(topic);
// attempt to compare the message with the removed message
Assert.Single(removedMessages);
Assert.Equal(message.Id, removedMessages[0].Id);
// ensure the queue is empty
removedMessages = await grain.RemoveMessagesAsync(topic);
Assert.Empty(removedMessages);
}
}

View File

@ -44,23 +44,24 @@ async def main() -> None:
await UserProxy.register(runtime, "HelloAgent", lambda: UserProxy())
await runtime.add_subscription(DefaultSubscription(agent_type="HelloAgent"))
await runtime.add_subscription(TypeSubscription(topic_type="HelloTopic", agent_type="HelloAgent"))
await runtime.add_subscription(TypeSubscription(topic_type="agents.NewMessageReceived", agent_type="HelloAgent"))
await runtime.add_subscription(TypeSubscription(topic_type="agents.ConversationClosed", agent_type="HelloAgent"))
await runtime.add_subscription(TypeSubscription(topic_type="agents.Output", agent_type="HelloAgent"))
agnext_logger.info("3")
new_message = NewMessageReceived(message="from Python!")
new_message = NewMessageReceived(message="Hello from Python!")
output_message = Output(message="^v^v^v---Wild Hello from Python!---^v^v^v")
await runtime.publish_message(
message=new_message,
topic_id=DefaultTopicId("agents.NewMessageReceived", "HelloAgents/python"),
topic_id=DefaultTopicId("HelloTopic", "HelloAgents/python"),
sender=AgentId("HelloAgents", "python"),
)
runtime.add_message_serializer(try_get_known_serializers_for_type(Output))
await runtime.publish_message(
message=output_message,
topic_id=DefaultTopicId("agents.Output", "HelloAgents"),
topic_id=DefaultTopicId("HelloTopic", "HelloAgents/python"),
sender=AgentId("HelloAgents", "python"),
)
await runtime.stop_when_signal()