diff --git a/dotnet/AGNext.sln b/dotnet/AGNext.sln index 316c78ea6..fbf8179bc 100644 --- a/dotnet/AGNext.sln +++ b/dotnet/AGNext.sln @@ -38,6 +38,20 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Solution Items", "Solution .editorconfig = .editorconfig EndProjectSection EndProject +Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Microsoft.AI.Agents.Worker.Client", "src\Microsoft.AI.Agents.Worker.Client\Microsoft.AI.Agents.Worker.Client.csproj", "{20E5C8C3-CE40-4FC3-96F8-B4A2C51936E9}" +EndProject +Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Microsoft.AI.Agents.Worker.Server", "src\Microsoft.AI.Agents.Worker.Server\Microsoft.AI.Agents.Worker.Server.csproj", "{B9188ADC-D322-4B38-B3D6-95338E89C34B}" +EndProject +Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "greeter", "greeter", "{320B05A6-4E1B-4B15-B3F6-745819D2BF22}" +EndProject +Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Greeter.AppHost", "samples\Greeter\Greeter.AppHost\Greeter.AppHost.csproj", "{06B30F2A-BA17-451A-81FF-E9CC9551F671}" +EndProject +Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Greeter.ServiceDefaults", "samples\Greeter\Greeter.ServiceDefaults\Greeter.ServiceDefaults.csproj", "{E45990FD-85B3-44A2-8646-4AB2E868BC5F}" +EndProject +Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Greeter.AgentHost", "samples\Greeter\Greeter.AgentHost\Greeter.AgentHost.csproj", "{590BACCE-7310-4D7B-9618-46496F2EB171}" +EndProject +Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Greeter.AgentWorker", "samples\Greeter\Greeter.AgentWorker\Greeter.AgentWorker.csproj", "{7BA721F2-EE46-4A85-A8C8-3695C4ADF93E}" +EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution Debug|Any CPU = Debug|Any CPU @@ -76,6 +90,30 @@ Global {92CAAA29-8633-4984-B169-29BB89E0EA23}.Debug|Any CPU.Build.0 = Debug|Any CPU {92CAAA29-8633-4984-B169-29BB89E0EA23}.Release|Any CPU.ActiveCfg = Release|Any CPU {92CAAA29-8633-4984-B169-29BB89E0EA23}.Release|Any CPU.Build.0 = Release|Any CPU + {20E5C8C3-CE40-4FC3-96F8-B4A2C51936E9}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {20E5C8C3-CE40-4FC3-96F8-B4A2C51936E9}.Debug|Any CPU.Build.0 = Debug|Any CPU + {20E5C8C3-CE40-4FC3-96F8-B4A2C51936E9}.Release|Any CPU.ActiveCfg = Release|Any CPU + {20E5C8C3-CE40-4FC3-96F8-B4A2C51936E9}.Release|Any CPU.Build.0 = Release|Any CPU + {B9188ADC-D322-4B38-B3D6-95338E89C34B}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {B9188ADC-D322-4B38-B3D6-95338E89C34B}.Debug|Any CPU.Build.0 = Debug|Any CPU + {B9188ADC-D322-4B38-B3D6-95338E89C34B}.Release|Any CPU.ActiveCfg = Release|Any CPU + {B9188ADC-D322-4B38-B3D6-95338E89C34B}.Release|Any CPU.Build.0 = Release|Any CPU + {06B30F2A-BA17-451A-81FF-E9CC9551F671}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {06B30F2A-BA17-451A-81FF-E9CC9551F671}.Debug|Any CPU.Build.0 = Debug|Any CPU + {06B30F2A-BA17-451A-81FF-E9CC9551F671}.Release|Any CPU.ActiveCfg = Release|Any CPU + {06B30F2A-BA17-451A-81FF-E9CC9551F671}.Release|Any CPU.Build.0 = Release|Any CPU + {E45990FD-85B3-44A2-8646-4AB2E868BC5F}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {E45990FD-85B3-44A2-8646-4AB2E868BC5F}.Debug|Any CPU.Build.0 = Debug|Any CPU + {E45990FD-85B3-44A2-8646-4AB2E868BC5F}.Release|Any CPU.ActiveCfg = Release|Any CPU + {E45990FD-85B3-44A2-8646-4AB2E868BC5F}.Release|Any CPU.Build.0 = Release|Any CPU + {590BACCE-7310-4D7B-9618-46496F2EB171}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {590BACCE-7310-4D7B-9618-46496F2EB171}.Debug|Any CPU.Build.0 = Debug|Any CPU + {590BACCE-7310-4D7B-9618-46496F2EB171}.Release|Any CPU.ActiveCfg = Release|Any CPU + {590BACCE-7310-4D7B-9618-46496F2EB171}.Release|Any CPU.Build.0 = Release|Any CPU + {7BA721F2-EE46-4A85-A8C8-3695C4ADF93E}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {7BA721F2-EE46-4A85-A8C8-3695C4ADF93E}.Debug|Any CPU.Build.0 = Debug|Any CPU + {7BA721F2-EE46-4A85-A8C8-3695C4ADF93E}.Release|Any CPU.ActiveCfg = Release|Any CPU + {7BA721F2-EE46-4A85-A8C8-3695C4ADF93E}.Release|Any CPU.Build.0 = Release|Any CPU EndGlobalSection GlobalSection(SolutionProperties) = preSolution HideSolutionNode = FALSE @@ -94,6 +132,13 @@ Global {EF5DF177-F4F2-49D5-9E1C-2E37869238D8} = {943853E7-513D-45EA-870F-549CFC0AF8E8} {92CAAA29-8633-4984-B169-29BB89E0EA23} = {65CF8F20-D740-46AC-A869-FA609D960A09} {65CF8F20-D740-46AC-A869-FA609D960A09} = {943853E7-513D-45EA-870F-549CFC0AF8E8} + {20E5C8C3-CE40-4FC3-96F8-B4A2C51936E9} = {290F9824-BAD3-4703-B9B7-FE9C4BE3A1CF} + {B9188ADC-D322-4B38-B3D6-95338E89C34B} = {290F9824-BAD3-4703-B9B7-FE9C4BE3A1CF} + {320B05A6-4E1B-4B15-B3F6-745819D2BF22} = {943853E7-513D-45EA-870F-549CFC0AF8E8} + {06B30F2A-BA17-451A-81FF-E9CC9551F671} = {320B05A6-4E1B-4B15-B3F6-745819D2BF22} + {E45990FD-85B3-44A2-8646-4AB2E868BC5F} = {320B05A6-4E1B-4B15-B3F6-745819D2BF22} + {590BACCE-7310-4D7B-9618-46496F2EB171} = {320B05A6-4E1B-4B15-B3F6-745819D2BF22} + {7BA721F2-EE46-4A85-A8C8-3695C4ADF93E} = {320B05A6-4E1B-4B15-B3F6-745819D2BF22} EndGlobalSection GlobalSection(ExtensibilityGlobals) = postSolution SolutionGuid = {C9250809-2B94-4552-99DE-333E4646A16F} diff --git a/dotnet/Directory.Packages.props b/dotnet/Directory.Packages.props index 2fe425b4f..9b19e2243 100644 --- a/dotnet/Directory.Packages.props +++ b/dotnet/Directory.Packages.props @@ -5,6 +5,14 @@ + + + + + + + + @@ -23,6 +31,9 @@ + + + @@ -34,12 +45,14 @@ + + @@ -49,6 +62,11 @@ + + + + + diff --git a/dotnet/samples/Greeter/Greeter.AgentHost/Greeter.AgentHost.csproj b/dotnet/samples/Greeter/Greeter.AgentHost/Greeter.AgentHost.csproj new file mode 100644 index 000000000..39405ba65 --- /dev/null +++ b/dotnet/samples/Greeter/Greeter.AgentHost/Greeter.AgentHost.csproj @@ -0,0 +1,20 @@ + + + + net8.0 + enable + enable + + + + + + + + + + + + + + diff --git a/dotnet/samples/Greeter/Greeter.AgentHost/Program.cs b/dotnet/samples/Greeter/Greeter.AgentHost/Program.cs new file mode 100644 index 000000000..d7110ec27 --- /dev/null +++ b/dotnet/samples/Greeter/Greeter.AgentHost/Program.cs @@ -0,0 +1,17 @@ +using Microsoft.AI.Agents.Worker; + +var builder = WebApplication.CreateBuilder(args); +builder.AddServiceDefaults(); +builder.Services.AddProblemDetails(); +builder.Services.AddGrpc(); +builder.Logging.SetMinimumLevel(LogLevel.Information); + +builder.AddAgentService(); + +var app = builder.Build(); + +app.MapAgentService(); +app.UseExceptionHandler(); +app.MapDefaultEndpoints(); + +app.Run(); diff --git a/dotnet/samples/Greeter/Greeter.AgentHost/Properties/launchSettings.json b/dotnet/samples/Greeter/Greeter.AgentHost/Properties/launchSettings.json new file mode 100644 index 000000000..a0ff005e3 --- /dev/null +++ b/dotnet/samples/Greeter/Greeter.AgentHost/Properties/launchSettings.json @@ -0,0 +1,23 @@ +{ + "$schema": "https://json.schemastore.org/launchsettings.json", + "profiles": { + "http": { + "commandName": "Project", + "dotnetRunMessages": true, + "launchBrowser": true, + "applicationUrl": "http://localhost:5438", + "environmentVariables": { + "ASPNETCORE_ENVIRONMENT": "Development" + } + }, + "https": { + "commandName": "Project", + "dotnetRunMessages": true, + "launchBrowser": true, + "applicationUrl": "https://localhost:7511;http://localhost:5438", + "environmentVariables": { + "ASPNETCORE_ENVIRONMENT": "Development" + } + } + } +} diff --git a/dotnet/samples/Greeter/Greeter.AgentHost/appsettings.Development.json b/dotnet/samples/Greeter/Greeter.AgentHost/appsettings.Development.json new file mode 100644 index 000000000..585fa1ae0 --- /dev/null +++ b/dotnet/samples/Greeter/Greeter.AgentHost/appsettings.Development.json @@ -0,0 +1,8 @@ +{ + "AllowedHosts": "*", + "Kestrel": { + "EndpointDefaults": { + "Protocols": "Http2" + } + } +} diff --git a/dotnet/samples/Greeter/Greeter.AgentWorker/Client.cs b/dotnet/samples/Greeter/Greeter.AgentWorker/Client.cs new file mode 100644 index 000000000..a8503b11e --- /dev/null +++ b/dotnet/samples/Greeter/Greeter.AgentWorker/Client.cs @@ -0,0 +1,30 @@ +using Agents; +using Microsoft.AI.Agents.Worker.Client; +using AgentId = Microsoft.AI.Agents.Worker.Client.AgentId; + +namespace Greeter.AgentWorker; + +internal sealed class Client(ILogger logger, AgentWorkerRuntime runtime) : AgentBase(new ClientContext(logger, runtime)) +{ + private sealed class ClientContext(ILogger logger, AgentWorkerRuntime runtime) : IAgentContext + { + public AgentId AgentId { get; } = new AgentId("client", Guid.NewGuid().ToString()); + public AgentBase? AgentInstance { get; set; } + public ILogger Logger { get; } = logger; + + public async ValueTask PublishEventAsync(Event @event) + { + await runtime.PublishEvent(@event).ConfigureAwait(false); + } + + public async ValueTask SendRequestAsync(AgentBase agent, RpcRequest request) + { + await runtime.SendRequest(AgentInstance!, request).ConfigureAwait(false); + } + + public async ValueTask SendResponseAsync(RpcRequest request, RpcResponse response) + { + await runtime.SendResponse(response).ConfigureAwait(false); + } + } +} diff --git a/dotnet/samples/Greeter/Greeter.AgentWorker/Greeter.AgentWorker.csproj b/dotnet/samples/Greeter/Greeter.AgentWorker/Greeter.AgentWorker.csproj new file mode 100644 index 000000000..0b7a700bd --- /dev/null +++ b/dotnet/samples/Greeter/Greeter.AgentWorker/Greeter.AgentWorker.csproj @@ -0,0 +1,14 @@ + + + + net8.0 + enable + enable + + + + + + + + diff --git a/dotnet/samples/Greeter/Greeter.AgentWorker/Program.cs b/dotnet/samples/Greeter/Greeter.AgentWorker/Program.cs new file mode 100644 index 000000000..b84df774a --- /dev/null +++ b/dotnet/samples/Greeter/Greeter.AgentWorker/Program.cs @@ -0,0 +1,62 @@ +using Agents; +using Greeter.AgentWorker; +using Microsoft.AI.Agents.Worker.Client; +using AgentId = Microsoft.AI.Agents.Worker.Client.AgentId; + +var builder = WebApplication.CreateBuilder(args); + +// Add service defaults & Aspire components. +builder.AddServiceDefaults(); + +var agentBuilder = builder.AddAgentWorker("https://agenthost"); +agentBuilder.AddAgent("greeter"); +builder.Services.AddHostedService(); +builder.Services.AddSingleton(); + +var app = builder.Build(); + +app.MapDefaultEndpoints(); + +app.Run(); + +internal sealed class GreetingAgent(IAgentContext context, ILogger logger) : AgentBase(context) +{ + protected override Task HandleEvent(Microsoft.AI.Agents.Abstractions.Event @event) + { + logger.LogInformation("[{Id}] Received event: '{Event}'.", AgentId, @event); + return base.HandleEvent(@event); + } + + protected override Task HandleRequest(RpcRequest request) + { + logger.LogInformation("[{Id}] Received request: '{Request}'.", AgentId, request); + return Task.FromResult(new RpcResponse() { Result = "Okay!" }); + } +} + +internal sealed class MyBackgroundService(ILogger logger, Client client) : BackgroundService +{ + protected override async Task ExecuteAsync(CancellationToken stoppingToken) + { + while (!stoppingToken.IsCancellationRequested) + { + try + { + var generatedCodeId = Guid.NewGuid().ToString(); + var instanceId = Guid.NewGuid().ToString(); + var response = await client.RequestAsync( + new AgentId("greeter", "foo"), + "echo", + new Dictionary { ["message"] = "Hello, agents!" }).ConfigureAwait(false); + + logger.LogInformation("Received response: {Response}", response); + } + catch (Exception exception) + { + logger.LogError(exception, "Error invoking request."); + } + + await Task.Delay(TimeSpan.FromMinutes(2), stoppingToken).ConfigureAwait(false); + } + } +} diff --git a/dotnet/samples/Greeter/Greeter.AgentWorker/Properties/launchSettings.json b/dotnet/samples/Greeter/Greeter.AgentWorker/Properties/launchSettings.json new file mode 100644 index 000000000..44c2fc16c --- /dev/null +++ b/dotnet/samples/Greeter/Greeter.AgentWorker/Properties/launchSettings.json @@ -0,0 +1,23 @@ +{ + "$schema": "https://json.schemastore.org/launchsettings.json", + "profiles": { + "http": { + "commandName": "Project", + "dotnetRunMessages": true, + "launchBrowser": true, + "applicationUrl": "http://localhost:5181", + "environmentVariables": { + "ASPNETCORE_ENVIRONMENT": "Development" + } + }, + "https": { + "commandName": "Project", + "dotnetRunMessages": true, + "launchBrowser": true, + "applicationUrl": "https://localhost:7050;http://localhost:5181", + "environmentVariables": { + "ASPNETCORE_ENVIRONMENT": "Development" + } + } + } +} diff --git a/dotnet/samples/Greeter/Greeter.AgentWorker/appsettings.Development.json b/dotnet/samples/Greeter/Greeter.AgentWorker/appsettings.Development.json new file mode 100644 index 000000000..0c208ae91 --- /dev/null +++ b/dotnet/samples/Greeter/Greeter.AgentWorker/appsettings.Development.json @@ -0,0 +1,8 @@ +{ + "Logging": { + "LogLevel": { + "Default": "Information", + "Microsoft.AspNetCore": "Warning" + } + } +} diff --git a/dotnet/samples/Greeter/Greeter.AppHost/Greeter.AppHost.csproj b/dotnet/samples/Greeter/Greeter.AppHost/Greeter.AppHost.csproj new file mode 100644 index 000000000..c5a7f88f8 --- /dev/null +++ b/dotnet/samples/Greeter/Greeter.AppHost/Greeter.AppHost.csproj @@ -0,0 +1,27 @@ + + + + Exe + net8.0 + enable + enable + true + 6e251df6-43b1-498f-87a8-3cc77c302c21 + + + + + + + + + + + + + + + + + + diff --git a/dotnet/samples/Greeter/Greeter.AppHost/Program.cs b/dotnet/samples/Greeter/Greeter.AppHost/Program.cs new file mode 100644 index 000000000..b9ed96609 --- /dev/null +++ b/dotnet/samples/Greeter/Greeter.AppHost/Program.cs @@ -0,0 +1,21 @@ +var builder = DistributedApplication.CreateBuilder(args); + +builder.AddAzureProvisioning(); + +var orleans = builder.AddOrleans("orleans") + .WithDevelopmentClustering() + .WithMemoryReminders() + .WithMemoryGrainStorage("agent-state"); + +var agentHost = builder.AddProject("agenthost") + .WithReference(orleans); + +builder.AddProject("csharp-worker") + .WithExternalHttpEndpoints() + .WithReference(agentHost); + +var ep = agentHost.GetEndpoint("http"); +builder.AddExecutable("python-worker", "hatch", "../../../../python/", "run", "python", "worker_example.py") + .WithEnvironment("AGENT_HOST", $"{ep.Property(EndpointProperty.Host)}:{ep.Property(EndpointProperty.Port)}"); + +builder.Build().Run(); diff --git a/dotnet/samples/Greeter/Greeter.AppHost/Properties/launchSettings.json b/dotnet/samples/Greeter/Greeter.AppHost/Properties/launchSettings.json new file mode 100644 index 000000000..34c6d1968 --- /dev/null +++ b/dotnet/samples/Greeter/Greeter.AppHost/Properties/launchSettings.json @@ -0,0 +1,29 @@ +{ + "$schema": "https://json.schemastore.org/launchsettings.json", + "profiles": { + "https": { + "commandName": "Project", + "dotnetRunMessages": true, + "launchBrowser": true, + "applicationUrl": "https://localhost:17267;http://localhost:15190", + "environmentVariables": { + "ASPNETCORE_ENVIRONMENT": "Development", + "DOTNET_ENVIRONMENT": "Development", + "DOTNET_DASHBOARD_OTLP_ENDPOINT_URL": "https://localhost:21107", + "DOTNET_RESOURCE_SERVICE_ENDPOINT_URL": "https://localhost:22230" + } + }, + "http": { + "commandName": "Project", + "dotnetRunMessages": true, + "launchBrowser": true, + "applicationUrl": "http://localhost:15190", + "environmentVariables": { + "ASPNETCORE_ENVIRONMENT": "Development", + "DOTNET_ENVIRONMENT": "Development", + "DOTNET_DASHBOARD_OTLP_ENDPOINT_URL": "http://localhost:19078", + "DOTNET_RESOURCE_SERVICE_ENDPOINT_URL": "http://localhost:20236" + } + } + } +} diff --git a/dotnet/samples/Greeter/Greeter.AppHost/appsettings.Development.json b/dotnet/samples/Greeter/Greeter.AppHost/appsettings.Development.json new file mode 100644 index 000000000..0c208ae91 --- /dev/null +++ b/dotnet/samples/Greeter/Greeter.AppHost/appsettings.Development.json @@ -0,0 +1,8 @@ +{ + "Logging": { + "LogLevel": { + "Default": "Information", + "Microsoft.AspNetCore": "Warning" + } + } +} diff --git a/dotnet/samples/Greeter/Greeter.ServiceDefaults/Extensions.cs b/dotnet/samples/Greeter/Greeter.ServiceDefaults/Extensions.cs new file mode 100644 index 000000000..ce94dc2c4 --- /dev/null +++ b/dotnet/samples/Greeter/Greeter.ServiceDefaults/Extensions.cs @@ -0,0 +1,111 @@ +using Microsoft.AspNetCore.Builder; +using Microsoft.AspNetCore.Diagnostics.HealthChecks; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Diagnostics.HealthChecks; +using Microsoft.Extensions.Logging; +using OpenTelemetry; +using OpenTelemetry.Metrics; +using OpenTelemetry.Trace; + +namespace Microsoft.Extensions.Hosting; + +// Adds common .NET Aspire services: service discovery, resilience, health checks, and OpenTelemetry. +// This project should be referenced by each service project in your solution. +// To learn more about using this project, see https://aka.ms/dotnet/aspire/service-defaults +public static class Extensions +{ + public static IHostApplicationBuilder AddServiceDefaults(this IHostApplicationBuilder builder) + { + builder.ConfigureOpenTelemetry(); + + builder.AddDefaultHealthChecks(); + + builder.Services.AddServiceDiscovery(); + + builder.Services.ConfigureHttpClientDefaults(http => + { + // Turn on resilience by default + http.AddStandardResilienceHandler(); + + // Turn on service discovery by default + http.AddServiceDiscovery(); + }); + + return builder; + } + + public static IHostApplicationBuilder ConfigureOpenTelemetry(this IHostApplicationBuilder builder) + { + builder.Logging.AddOpenTelemetry(logging => + { + logging.IncludeFormattedMessage = true; + logging.IncludeScopes = true; + }); + + builder.Services.AddOpenTelemetry() + .WithMetrics(metrics => + { + metrics.AddAspNetCoreInstrumentation() + .AddHttpClientInstrumentation() + .AddRuntimeInstrumentation(); + }) + .WithTracing(tracing => + { + tracing.AddAspNetCoreInstrumentation() + // Uncomment the following line to enable gRPC instrumentation (requires the OpenTelemetry.Instrumentation.GrpcNetClient package) + //.AddGrpcClientInstrumentation() + .AddHttpClientInstrumentation(); + }); + + builder.AddOpenTelemetryExporters(); + + return builder; + } + + private static IHostApplicationBuilder AddOpenTelemetryExporters(this IHostApplicationBuilder builder) + { + var useOtlpExporter = !string.IsNullOrWhiteSpace(builder.Configuration["OTEL_EXPORTER_OTLP_ENDPOINT"]); + + if (useOtlpExporter) + { + builder.Services.AddOpenTelemetry().UseOtlpExporter(); + } + + // Uncomment the following lines to enable the Azure Monitor exporter (requires the Azure.Monitor.OpenTelemetry.AspNetCore package) + //if (!string.IsNullOrEmpty(builder.Configuration["APPLICATIONINSIGHTS_CONNECTION_STRING"])) + //{ + // builder.Services.AddOpenTelemetry() + // .UseAzureMonitor(); + //} + + return builder; + } + + public static IHostApplicationBuilder AddDefaultHealthChecks(this IHostApplicationBuilder builder) + { + builder.Services.AddHealthChecks() + // Add a default liveness check to ensure app is responsive + .AddCheck("self", () => HealthCheckResult.Healthy(), ["live"]); + + return builder; + } + + public static WebApplication MapDefaultEndpoints(this WebApplication app) + { + // Adding health checks endpoints to applications in non-development environments has security implications. + // See https://aka.ms/dotnet/aspire/healthchecks for details before enabling these endpoints in non-development environments. + if (app.Environment.IsDevelopment()) + { + // All health checks must pass for app to be considered ready to accept traffic after starting + app.MapHealthChecks("/health"); + + // Only health checks tagged with the "live" tag must pass for app to be considered alive + app.MapHealthChecks("/alive", new HealthCheckOptions + { + Predicate = r => r.Tags.Contains("live") + }); + } + + return app; + } +} diff --git a/dotnet/samples/Greeter/Greeter.ServiceDefaults/Greeter.ServiceDefaults.csproj b/dotnet/samples/Greeter/Greeter.ServiceDefaults/Greeter.ServiceDefaults.csproj new file mode 100644 index 000000000..2388aea65 --- /dev/null +++ b/dotnet/samples/Greeter/Greeter.ServiceDefaults/Greeter.ServiceDefaults.csproj @@ -0,0 +1,22 @@ + + + + net8.0 + enable + enable + true + + + + + + + + + + + + + + + diff --git a/dotnet/samples/gh-flow/src/Microsoft.AI.DevTeam.Dapr/Agents/AzureGenie.cs b/dotnet/samples/gh-flow/src/Microsoft.AI.DevTeam.Dapr/Agents/AzureGenie.cs index 7417f022c..ffd1349ef 100644 --- a/dotnet/samples/gh-flow/src/Microsoft.AI.DevTeam.Dapr/Agents/AzureGenie.cs +++ b/dotnet/samples/gh-flow/src/Microsoft.AI.DevTeam.Dapr/Agents/AzureGenie.cs @@ -24,8 +24,9 @@ public class AzureGenie : Agent, IDaprAgent { var context = item.ToGithubContext(); await Store(context.Org, context.Repo, context.ParentNumber ?? 0, context.IssueNumber, "readme", "md", "output", item.Data["readme"]); - await PublishEvent(Consts.PubSub, Consts.MainTopic, new Event + await PublishEvent(new Event { + Namespace = Consts.MainTopic, Type = nameof(GithubFlowEventType.ReadmeStored), Subject = context.Subject, Data = context.ToData() @@ -38,8 +39,9 @@ public class AzureGenie : Agent, IDaprAgent var context = item.ToGithubContext(); await Store(context.Org, context.Repo, context.ParentNumber ?? 0, context.IssueNumber, "run", "sh", "output", item.Data["code"]); await RunInSandbox(context.Org, context.Repo, context.ParentNumber ?? 0, context.IssueNumber); - await PublishEvent(Consts.PubSub, Consts.MainTopic, new Event + await PublishEvent(new Event { + Namespace = Consts.MainTopic, Type = nameof(GithubFlowEventType.SandboxRunCreated), Subject = context.Subject, Data = context.ToData() diff --git a/dotnet/samples/gh-flow/src/Microsoft.AI.DevTeam.Dapr/Agents/Developer/Developer.cs b/dotnet/samples/gh-flow/src/Microsoft.AI.DevTeam.Dapr/Agents/Developer/Developer.cs index 3f22c00ee..017ec8b05 100644 --- a/dotnet/samples/gh-flow/src/Microsoft.AI.DevTeam.Dapr/Agents/Developer/Developer.cs +++ b/dotnet/samples/gh-flow/src/Microsoft.AI.DevTeam.Dapr/Agents/Developer/Developer.cs @@ -27,8 +27,9 @@ public class Dev : AiAgent, IDaprAgent var code = await GenerateCode(item.Data["input"]); var data = context.ToData(); data["result"] = code; - await PublishEvent(Consts.PubSub, Consts.MainTopic, new Event + await PublishEvent(new Event { + Namespace = Consts.MainTopic, Type = nameof(GithubFlowEventType.CodeGenerated), Subject = context.Subject, Data = data @@ -41,8 +42,9 @@ public class Dev : AiAgent, IDaprAgent var lastCode = state.History.Last().Message; var data = context.ToData(); data["code"] = lastCode; - await PublishEvent(Consts.PubSub, Consts.MainTopic, new Event + await PublishEvent(new Event { + Namespace = Consts.MainTopic, Type = nameof(GithubFlowEventType.CodeCreated), Subject = context.Subject, Data = data diff --git a/dotnet/samples/gh-flow/src/Microsoft.AI.DevTeam.Dapr/Agents/DeveloperLead/DeveloperLead.cs b/dotnet/samples/gh-flow/src/Microsoft.AI.DevTeam.Dapr/Agents/DeveloperLead/DeveloperLead.cs index 899ce6911..218bdac07 100644 --- a/dotnet/samples/gh-flow/src/Microsoft.AI.DevTeam.Dapr/Agents/DeveloperLead/DeveloperLead.cs +++ b/dotnet/samples/gh-flow/src/Microsoft.AI.DevTeam.Dapr/Agents/DeveloperLead/DeveloperLead.cs @@ -28,8 +28,9 @@ public class DeveloperLead : AiAgent, IDaprAgent var plan = await CreatePlan(item.Data["input"]); var data = context.ToData(); data["result"] = plan; - await PublishEvent(Consts.PubSub, Consts.MainTopic, new Event + await PublishEvent(new Event { + Namespace = Consts.MainTopic, Type = nameof(GithubFlowEventType.DevPlanGenerated), Subject = context.Subject, Data = data @@ -42,8 +43,9 @@ public class DeveloperLead : AiAgent, IDaprAgent var latestPlan = state.History.Last().Message; var data = context.ToData(); data["plan"] = latestPlan; - await PublishEvent(Consts.PubSub, Consts.MainTopic, new Event + await PublishEvent(new Event { + Namespace = Consts.MainTopic, Type = nameof(GithubFlowEventType.DevPlanCreated), Subject = context.Subject, Data = data diff --git a/dotnet/samples/gh-flow/src/Microsoft.AI.DevTeam.Dapr/Agents/ProductManager/ProductManager.cs b/dotnet/samples/gh-flow/src/Microsoft.AI.DevTeam.Dapr/Agents/ProductManager/ProductManager.cs index 3e8e62977..03c5e773b 100644 --- a/dotnet/samples/gh-flow/src/Microsoft.AI.DevTeam.Dapr/Agents/ProductManager/ProductManager.cs +++ b/dotnet/samples/gh-flow/src/Microsoft.AI.DevTeam.Dapr/Agents/ProductManager/ProductManager.cs @@ -29,8 +29,9 @@ public class ProductManager : AiAgent, IDaprAgent var readme = await CreateReadme(item.Data["input"]); var data = context.ToData(); data["result"] = readme; - await PublishEvent(Consts.PubSub, Consts.MainTopic, new Event + await PublishEvent(new Event { + Namespace = Consts.MainTopic, Type = nameof(GithubFlowEventType.ReadmeGenerated), Subject = context.Subject, Data = data @@ -43,8 +44,9 @@ public class ProductManager : AiAgent, IDaprAgent var lastReadme = state.History.Last().Message; var data = context.ToData(); data["readme"] = lastReadme; - await PublishEvent(Consts.PubSub, Consts.MainTopic, new Event + await PublishEvent(new Event { + Namespace = Consts.MainTopic, Type = nameof(GithubFlowEventType.ReadmeCreated), Subject = context.Subject, Data = data diff --git a/dotnet/samples/gh-flow/src/Microsoft.AI.DevTeam.Dapr/Agents/Sandbox.cs b/dotnet/samples/gh-flow/src/Microsoft.AI.DevTeam.Dapr/Agents/Sandbox.cs index a3e61863c..867857dca 100644 --- a/dotnet/samples/gh-flow/src/Microsoft.AI.DevTeam.Dapr/Agents/Sandbox.cs +++ b/dotnet/samples/gh-flow/src/Microsoft.AI.DevTeam.Dapr/Agents/Sandbox.cs @@ -84,8 +84,9 @@ public class Sandbox : Agent, IDaprAgent, IRemindable { "parentNumber", agentState.ParentIssueNumber.ToString() } }; var subject = $"{agentState.Org}-{agentState.Repo}-{agentState.IssueNumber}"; - await PublishEvent(Consts.PubSub, Consts.MainTopic, new Event + await PublishEvent(new Event { + Namespace = Consts.MainTopic, Type = nameof(GithubFlowEventType.SandboxRunFinished), Subject = subject, Data = data diff --git a/dotnet/samples/gh-flow/src/Microsoft.AI.DevTeam.Dapr/Services/GithubWebHookProcessor.cs b/dotnet/samples/gh-flow/src/Microsoft.AI.DevTeam.Dapr/Services/GithubWebHookProcessor.cs index 91edb7709..40ec48e6f 100644 --- a/dotnet/samples/gh-flow/src/Microsoft.AI.DevTeam.Dapr/Services/GithubWebHookProcessor.cs +++ b/dotnet/samples/gh-flow/src/Microsoft.AI.DevTeam.Dapr/Services/GithubWebHookProcessor.cs @@ -117,6 +117,7 @@ public sealed class GithubWebHookProcessor : WebhookEventProcessor var evt = new Event { + Namespace = subject, Type = eventType, Subject = subject, Data = data @@ -148,6 +149,7 @@ public sealed class GithubWebHookProcessor : WebhookEventProcessor }; var evt = new Event { + Namespace = subject, Type = eventType, Subject = subject, Data = data diff --git a/dotnet/samples/gh-flow/src/Microsoft.AI.DevTeam/Agents/AzureGenie.cs b/dotnet/samples/gh-flow/src/Microsoft.AI.DevTeam/Agents/AzureGenie.cs index 5590af7fd..6ba98fb95 100644 --- a/dotnet/samples/gh-flow/src/Microsoft.AI.DevTeam/Agents/AzureGenie.cs +++ b/dotnet/samples/gh-flow/src/Microsoft.AI.DevTeam/Agents/AzureGenie.cs @@ -28,8 +28,9 @@ public class AzureGenie : Agent { var context = item.ToGithubContext(); await Store(context.Org, context.Repo, context.ParentNumber ?? 0, context.IssueNumber, "readme", "md", "output", item.Data["readme"]); - await PublishEvent(Consts.MainNamespace, this.GetPrimaryKeyString(), new Event + await PublishEvent(new Event { + Namespace = this.GetPrimaryKeyString(), Type = nameof(GithubFlowEventType.ReadmeStored), Subject = context.Subject, Data = context.ToData() @@ -42,8 +43,9 @@ public class AzureGenie : Agent var context = item.ToGithubContext(); await Store(context.Org, context.Repo, context.ParentNumber ?? 0, context.IssueNumber, "run", "sh", "output", item.Data["code"]); await RunInSandbox(context.Org, context.Repo, context.ParentNumber ?? 0, context.IssueNumber); - await PublishEvent(Consts.MainNamespace, this.GetPrimaryKeyString(), new Event + await PublishEvent(new Event { + Namespace = this.GetPrimaryKeyString(), Type = nameof(GithubFlowEventType.SandboxRunCreated), Subject = context.Subject, Data = context.ToData() diff --git a/dotnet/samples/gh-flow/src/Microsoft.AI.DevTeam/Agents/Developer/Developer.cs b/dotnet/samples/gh-flow/src/Microsoft.AI.DevTeam/Agents/Developer/Developer.cs index 86d63560d..065e82d09 100644 --- a/dotnet/samples/gh-flow/src/Microsoft.AI.DevTeam/Agents/Developer/Developer.cs +++ b/dotnet/samples/gh-flow/src/Microsoft.AI.DevTeam/Agents/Developer/Developer.cs @@ -31,8 +31,9 @@ public class Dev : AiAgent, IDevelopApps var code = await GenerateCode(item.Data["input"]); var data = context.ToData(); data["result"] = code; - await PublishEvent(Consts.MainNamespace, this.GetPrimaryKeyString(), new Event + await PublishEvent(new Event { + Namespace = this.GetPrimaryKeyString(), Type = nameof(GithubFlowEventType.CodeGenerated), Subject = context.Subject, Data = data @@ -46,8 +47,9 @@ public class Dev : AiAgent, IDevelopApps var lastCode = _state.State.History.Last().Message; var data = context.ToData(); data["code"] = lastCode; - await PublishEvent(Consts.MainNamespace, this.GetPrimaryKeyString(), new Event + await PublishEvent(new Event { + Namespace = this.GetPrimaryKeyString(), Type = nameof(GithubFlowEventType.CodeCreated), Subject = context.Subject, Data = data diff --git a/dotnet/samples/gh-flow/src/Microsoft.AI.DevTeam/Agents/DeveloperLead/DeveloperLead.cs b/dotnet/samples/gh-flow/src/Microsoft.AI.DevTeam/Agents/DeveloperLead/DeveloperLead.cs index c70737d4f..80eb0de6e 100644 --- a/dotnet/samples/gh-flow/src/Microsoft.AI.DevTeam/Agents/DeveloperLead/DeveloperLead.cs +++ b/dotnet/samples/gh-flow/src/Microsoft.AI.DevTeam/Agents/DeveloperLead/DeveloperLead.cs @@ -31,8 +31,9 @@ public class DeveloperLead : AiAgent, ILeadDevelopers var plan = await CreatePlan(item.Data["input"]); var data = context.ToData(); data["result"] = plan; - await PublishEvent(Consts.MainNamespace, this.GetPrimaryKeyString(), new Event + await PublishEvent(new Event { + Namespace = this.GetPrimaryKeyString(), Type = nameof(GithubFlowEventType.DevPlanGenerated), Subject = context.Subject, Data = data @@ -46,8 +47,9 @@ public class DeveloperLead : AiAgent, ILeadDevelopers var latestPlan = _state.State.History.Last().Message; var data = context.ToData(); data["plan"] = latestPlan; - await PublishEvent(Consts.MainNamespace, this.GetPrimaryKeyString(), new Event + await PublishEvent(new Event { + Namespace = this.GetPrimaryKeyString(), Type = nameof(GithubFlowEventType.DevPlanCreated), Subject = context.Subject, Data = data diff --git a/dotnet/samples/gh-flow/src/Microsoft.AI.DevTeam/Agents/ProductManager/ProductManager.cs b/dotnet/samples/gh-flow/src/Microsoft.AI.DevTeam/Agents/ProductManager/ProductManager.cs index 54df8699a..a2debdb24 100644 --- a/dotnet/samples/gh-flow/src/Microsoft.AI.DevTeam/Agents/ProductManager/ProductManager.cs +++ b/dotnet/samples/gh-flow/src/Microsoft.AI.DevTeam/Agents/ProductManager/ProductManager.cs @@ -30,8 +30,9 @@ public class ProductManager : AiAgent, IManageProducts var readme = await CreateReadme(item.Data["input"]); var data = context.ToData(); data["result"] = readme; - await PublishEvent(Consts.MainNamespace, this.GetPrimaryKeyString(), new Event + await PublishEvent(new Event { + Namespace = this.GetPrimaryKeyString(), Type = nameof(GithubFlowEventType.ReadmeGenerated), Subject = context.Subject, Data = data @@ -45,8 +46,9 @@ public class ProductManager : AiAgent, IManageProducts var lastReadme = _state.State.History.Last().Message; var data = context.ToData(); data["readme"] = lastReadme; - await PublishEvent(Consts.MainNamespace, this.GetPrimaryKeyString(), new Event + await PublishEvent(new Event { + Namespace = this.GetPrimaryKeyString(), Type = nameof(GithubFlowEventType.ReadmeCreated), Subject = context.Subject, Data = data diff --git a/dotnet/samples/gh-flow/src/Microsoft.AI.DevTeam/Agents/Sandbox.cs b/dotnet/samples/gh-flow/src/Microsoft.AI.DevTeam/Agents/Sandbox.cs index d9fefaeb4..f0adcac8c 100644 --- a/dotnet/samples/gh-flow/src/Microsoft.AI.DevTeam/Agents/Sandbox.cs +++ b/dotnet/samples/gh-flow/src/Microsoft.AI.DevTeam/Agents/Sandbox.cs @@ -58,14 +58,16 @@ public sealed class Sandbox : Agent, IRemindable if (await _azService.IsSandboxCompleted(sandboxId)) { await _azService.DeleteSandbox(sandboxId); - await PublishEvent(Consts.MainNamespace, this.GetPrimaryKeyString(), new Event + await PublishEvent(new Event { + Namespace = this.GetPrimaryKeyString(), Type = nameof(GithubFlowEventType.SandboxRunFinished), - Data = new Dictionary { - { "org", _state.State.Org }, - { "repo", _state.State.Repo }, - { "issueNumber", _state.State.IssueNumber.ToString() }, - { "parentNumber", _state.State.ParentIssueNumber.ToString() } + Data = new Dictionary + { + ["org"] = _state.State.Org, + ["repo"] = _state.State.Repo, + ["issueNumber"] = _state.State.IssueNumber.ToString(), + ["parentNumber"] = _state.State.ParentIssueNumber.ToString() } }); await Cleanup(); diff --git a/dotnet/samples/gh-flow/src/Microsoft.AI.DevTeam/Services/GithubWebHookProcessor.cs b/dotnet/samples/gh-flow/src/Microsoft.AI.DevTeam/Services/GithubWebHookProcessor.cs index 20e544089..cbe9b2c95 100644 --- a/dotnet/samples/gh-flow/src/Microsoft.AI.DevTeam/Services/GithubWebHookProcessor.cs +++ b/dotnet/samples/gh-flow/src/Microsoft.AI.DevTeam/Services/GithubWebHookProcessor.cs @@ -117,7 +117,7 @@ public sealed class GithubWebHookProcessor : WebhookEventProcessor { var subject = suffix + issueNumber.ToString(); var streamProvider = _client.GetStreamProvider("StreamProvider"); - var streamId = StreamId.Create(Consts.MainNamespace, subject); + var streamId = StreamId.Create(ns: "default", key: subject); var stream = streamProvider.GetStream(streamId); var eventType = (skillName, functionName) switch { @@ -128,14 +128,15 @@ public sealed class GithubWebHookProcessor : WebhookEventProcessor }; var data = new Dictionary { - { "org", org }, - { "repo", repo }, - { "issueNumber", issueNumber.ToString() }, - { "parentNumber", (parentNumber ?? 0).ToString()} + ["org"] = org, + ["repo"] = repo, + ["issueNumber"] = issueNumber.ToString(), + ["parentNumber"] = (parentNumber ?? 0).ToString() }; await stream.OnNextAsync(new Event { + Namespace = subject, Type = eventType, Subject = subject, Data = data @@ -171,6 +172,7 @@ public sealed class GithubWebHookProcessor : WebhookEventProcessor }; await stream.OnNextAsync(new Event { + Namespace = subject, Type = eventType, Subject = subject, Data = data diff --git a/dotnet/samples/marketing/src/backend/Agents/CommunityManager/CommunityManager.cs b/dotnet/samples/marketing/src/backend/Agents/CommunityManager/CommunityManager.cs index 583ed330a..670a59e00 100644 --- a/dotnet/samples/marketing/src/backend/Agents/CommunityManager/CommunityManager.cs +++ b/dotnet/samples/marketing/src/backend/Agents/CommunityManager/CommunityManager.cs @@ -60,13 +60,15 @@ public class CommunityManager : AiAgent private async Task SendDesignedCreatedEvent(string socialMediaPost, string userId) { - await PublishEvent(Consts.OrleansNamespace, this.GetPrimaryKeyString(), new Event + await PublishEvent(new Event { + Namespace = this.GetPrimaryKeyString(), Type = nameof(EventTypes.SocialMediaPostCreated), - Data = new Dictionary { - { "UserId", userId }, - { nameof(socialMediaPost), socialMediaPost} - } + Data = new Dictionary + { + ["UserId"] = userId, + [nameof(socialMediaPost)] = socialMediaPost, + } }); } @@ -74,4 +76,4 @@ public class CommunityManager : AiAgent { return Task.FromResult(_state.State.Data.WrittenSocialMediaPost); } -} \ No newline at end of file +} diff --git a/dotnet/samples/marketing/src/backend/Agents/GraphicDesigner/GraphicDesigner.cs b/dotnet/samples/marketing/src/backend/Agents/GraphicDesigner/GraphicDesigner.cs index 21e3b6235..0ee7cdc35 100644 --- a/dotnet/samples/marketing/src/backend/Agents/GraphicDesigner/GraphicDesigner.cs +++ b/dotnet/samples/marketing/src/backend/Agents/GraphicDesigner/GraphicDesigner.cs @@ -61,8 +61,9 @@ public class GraphicDesigner : AiAgent private async Task SendDesignedCreatedEvent(string imageUri, string userId) { - await PublishEvent(Consts.OrleansNamespace, this.GetPrimaryKeyString(), new Event + await PublishEvent(new Event { + Namespace = this.GetPrimaryKeyString(), Type = nameof(EventTypes.GraphicDesignCreated), Data = new Dictionary { { "UserId", userId }, diff --git a/dotnet/samples/marketing/src/backend/Agents/Writer/Writer.cs b/dotnet/samples/marketing/src/backend/Agents/Writer/Writer.cs index 844261326..55dcb3703 100644 --- a/dotnet/samples/marketing/src/backend/Agents/Writer/Writer.cs +++ b/dotnet/samples/marketing/src/backend/Agents/Writer/Writer.cs @@ -57,8 +57,9 @@ public class Writer : AiAgent, IWriter private async Task SendDesignedCreatedEvent(string article, string userId) { - await PublishEvent(Consts.OrleansNamespace, this.GetPrimaryKeyString(), new Event + await PublishEvent(new Event { + Namespace = this.GetPrimaryKeyString(), Type = nameof(EventTypes.ArticleCreated), Data = new Dictionary { { "UserId", userId }, diff --git a/dotnet/samples/marketing/src/backend/Controller/Articles.cs b/dotnet/samples/marketing/src/backend/Controller/Articles.cs index 4c4e8664c..0f77f2996 100644 --- a/dotnet/samples/marketing/src/backend/Controller/Articles.cs +++ b/dotnet/samples/marketing/src/backend/Controller/Articles.cs @@ -1,4 +1,4 @@ -using Marketing.Agents; +using Marketing.Agents; using Marketing.Events; using Marketing.Options; using Microsoft.AI.Agents.Abstractions; @@ -53,6 +53,7 @@ public class Articles : ControllerBase await stream.OnNextAsync(new Event { + Namespace = UserId, Type = nameof(EventTypes.UserChatInput), Data = data }); diff --git a/dotnet/samples/marketing/src/backend/Options/Consts.cs b/dotnet/samples/marketing/src/backend/Options/Consts.cs index 7effc35f6..bfc101cc4 100644 --- a/dotnet/samples/marketing/src/backend/Options/Consts.cs +++ b/dotnet/samples/marketing/src/backend/Options/Consts.cs @@ -1,6 +1,6 @@ -namespace Marketing.Options; +namespace Marketing.Options; public static class Consts { - public const string OrleansNamespace = "DevPersonas"; + public const string OrleansNamespace = "default"; } diff --git a/dotnet/samples/marketing/src/backend/SignalRHub/ArticleHub.cs b/dotnet/samples/marketing/src/backend/SignalRHub/ArticleHub.cs index 579d0bc5c..707560f93 100644 --- a/dotnet/samples/marketing/src/backend/SignalRHub/ArticleHub.cs +++ b/dotnet/samples/marketing/src/backend/SignalRHub/ArticleHub.cs @@ -1,4 +1,4 @@ -namespace Marketing.SignalRHub; +namespace Marketing.SignalRHub; using Microsoft.AI.Agents.Abstractions; using Microsoft.AspNetCore.SignalR; @@ -41,6 +41,7 @@ public class ArticleHub : Hub await stream.OnNextAsync(new Event { + Namespace = frontEndMessage.UserId, Type = nameof(EventTypes.UserChatInput), Data = data }); @@ -69,8 +70,9 @@ public class ArticleHub : Hub }; await stream.OnNextAsync(new Event { + Namespace = frontEndMessage.UserId, Type = nameof(EventTypes.UserConnected), Data = data }); } -} \ No newline at end of file +} diff --git a/dotnet/spelling.dic b/dotnet/spelling.dic index e69de29bb..6655c19fc 100644 --- a/dotnet/spelling.dic +++ b/dotnet/spelling.dic @@ -0,0 +1,3 @@ +qdrant +orleans +openai diff --git a/dotnet/src/Microsoft.AI.Agents.Dapr/Agent.cs b/dotnet/src/Microsoft.AI.Agents.Dapr/Agent.cs index 10e4c95fa..1ac11bf74 100644 --- a/dotnet/src/Microsoft.AI.Agents.Dapr/Agent.cs +++ b/dotnet/src/Microsoft.AI.Agents.Dapr/Agent.cs @@ -1,4 +1,4 @@ -using Dapr.Actors.Runtime; +using Dapr.Actors.Runtime; using Dapr.Client; using Microsoft.AI.Agents.Abstractions; @@ -14,14 +14,15 @@ public abstract class Agent : Actor, IAgent } public abstract Task HandleEvent(Event item); - public async Task PublishEvent(string ns, string id, Event item) + public async Task PublishEvent(Event item) { - var metadata = new Dictionary() { - { "cloudevent.Type", item.Type }, - { "cloudevent.Subject", item.Subject }, - { "cloudevent.id", Guid.NewGuid().ToString()} - }; + var metadata = new Dictionary() + { + ["cloudevent.Type"] = item.Type, + ["cloudevent.Subject"] = item.Subject, + ["cloudevent.id"] = Guid.NewGuid().ToString() + }; - await _daprClient.PublishEventAsync(ns, id, item, metadata).ConfigureAwait(false); + await _daprClient.PublishEventAsync("default", item.Namespace, item, metadata).ConfigureAwait(false); } } diff --git a/dotnet/src/Microsoft.AI.Agents.Orleans/Agent.cs b/dotnet/src/Microsoft.AI.Agents.Orleans/Agent.cs index 0e803067b..e8d635b0c 100644 --- a/dotnet/src/Microsoft.AI.Agents.Orleans/Agent.cs +++ b/dotnet/src/Microsoft.AI.Agents.Orleans/Agent.cs @@ -1,4 +1,4 @@ -using Microsoft.AI.Agents.Abstractions; +using Microsoft.AI.Agents.Abstractions; using Orleans.Runtime; using Orleans.Streams; @@ -14,10 +14,10 @@ public abstract class Agent : Grain, IGrainWithStringKey, IAgent await HandleEvent(item).ConfigureAwait(true); } - public async Task PublishEvent(string ns, string id, Event item) + public async Task PublishEvent(Event item) { var streamProvider = this.GetStreamProvider("StreamProvider"); - var streamId = StreamId.Create(ns, id); + var streamId = StreamId.Create(ns: "default", key: item.Namespace); var stream = streamProvider.GetStream(streamId); await stream.OnNextAsync(item).ConfigureAwait(true); } diff --git a/dotnet/src/Microsoft.AI.Agents.Orleans/EventSurrogate.cs b/dotnet/src/Microsoft.AI.Agents.Orleans/EventSurrogate.cs index 9e550f056..f24cdd77d 100644 --- a/dotnet/src/Microsoft.AI.Agents.Orleans/EventSurrogate.cs +++ b/dotnet/src/Microsoft.AI.Agents.Orleans/EventSurrogate.cs @@ -6,10 +6,12 @@ namespace Microsoft.AI.Agents.Orleans; internal struct EventSurrogate { [Id(0)] - public Dictionary Data { get; set; } + public string Namespace { get; set; } [Id(1)] - public string Type { get; set; } + public Dictionary Data { get; set; } [Id(2)] + public string Type { get; set; } + [Id(3)] public string Subject { get; set; } } @@ -19,12 +21,13 @@ internal sealed class EventSurrogateConverter : { public Event ConvertFromSurrogate( in EventSurrogate surrogate) => - new Event { Data = surrogate.Data, Subject = surrogate.Subject, Type = surrogate.Type }; + new() { Namespace = surrogate.Namespace, Data = surrogate.Data, Subject = surrogate.Subject, Type = surrogate.Type }; public EventSurrogate ConvertToSurrogate( in Event value) => - new EventSurrogate + new() { + Namespace = value.Namespace, Data = value.Data, Type = value.Type, Subject = value.Subject diff --git a/dotnet/src/Microsoft.AI.Agents.Worker.Client/Agent.cs b/dotnet/src/Microsoft.AI.Agents.Worker.Client/Agent.cs new file mode 100644 index 000000000..d25c2484e --- /dev/null +++ b/dotnet/src/Microsoft.AI.Agents.Worker.Client/Agent.cs @@ -0,0 +1,13 @@ +using Microsoft.AI.Agents.Abstractions; + +namespace Microsoft.AI.Agents.Worker.Client; + +public abstract class Agent(IAgentContext context) : AgentBase(context), IAgent +{ + Task IAgent.HandleEvent(Event item) => base.HandleEvent(item); + + async Task IAgent.PublishEvent(Event item) + { + await base.PublishEvent(item); + } +} diff --git a/dotnet/src/Microsoft.AI.Agents.Worker.Client/AgentBase.cs b/dotnet/src/Microsoft.AI.Agents.Worker.Client/AgentBase.cs new file mode 100644 index 000000000..d2d2255d4 --- /dev/null +++ b/dotnet/src/Microsoft.AI.Agents.Worker.Client/AgentBase.cs @@ -0,0 +1,153 @@ +using Agents; +using Event = Microsoft.AI.Agents.Abstractions.Event; +using System.Threading.Channels; +using Microsoft.Extensions.Logging; +using System.Text.Json; + +namespace Microsoft.AI.Agents.Worker.Client; + +public abstract class AgentBase +{ + private readonly object _lock = new(); + private readonly Dictionary> _pendingRequests = []; + private readonly Channel _mailbox = Channel.CreateUnbounded(); + private readonly IAgentContext _context; + + protected internal AgentId AgentId => _context.AgentId; + protected internal ILogger Logger => _context.Logger; + protected internal IAgentContext Context => _context; + + protected AgentBase(IAgentContext context) + { + _context = context; + context.AgentInstance = this; + Completion = Start(); + } + + internal Task Completion { get; } + + internal Task Start() + { + var didSuppress = false; + if (!ExecutionContext.IsFlowSuppressed()) + { + didSuppress = true; + ExecutionContext.SuppressFlow(); + } + + try + { + return Task.Run(RunMessagePump); + } + finally + { + if (didSuppress) + { + ExecutionContext.RestoreFlow(); + } + } + } + + internal void ReceiveMessage(Message message) => _mailbox.Writer.TryWrite(message); + + private async Task RunMessagePump() + { + await Task.CompletedTask.ConfigureAwait(ConfigureAwaitOptions.ForceYielding); + await foreach (var message in _mailbox.Reader.ReadAllAsync()) + { + try + { + switch (message) + { + case Message msg: + await HandleRpcMessage(msg).ConfigureAwait(false); + break; + default: + throw new InvalidOperationException($"Unexpected message '{message}'."); + } + } + catch (Exception ex) + { + _context.Logger.LogError(ex, "Error processing message."); + } + } + } + + private async Task HandleRpcMessage(Message msg) + { + switch (msg.MessageCase) + { + case Message.MessageOneofCase.Event: + await HandleEvent(msg.Event.ToEvent()).ConfigureAwait(false); + break; + case Message.MessageOneofCase.Request: + await OnRequestCore(msg.Request).ConfigureAwait(false); + break; + case Message.MessageOneofCase.Response: + OnResponseCore(msg.Response); + break; + } + } + + private void OnResponseCore(RpcResponse response) + { + var requestId = response.RequestId; + TaskCompletionSource? completion; + lock (_lock) + { + if (!_pendingRequests.Remove(requestId, out completion)) + { + throw new InvalidOperationException($"Unknown request id '{requestId}'."); + } + } + + completion.SetResult(response); + } + + private async ValueTask OnRequestCore(RpcRequest request) + { + RpcResponse response; + + try + { + response = await HandleRequest(request).ConfigureAwait(false); + } + catch (Exception ex) + { + response = new RpcResponse { Error = ex.Message }; + } + + await _context.SendResponseAsync(request, response).ConfigureAwait(false); + } + + public async Task RequestAsync(AgentId target, string method, Dictionary parameters) + { + var requestId = Guid.NewGuid().ToString(); + var request = new RpcRequest + { + Target = target, + RequestId = requestId, + Method = method, + Data = JsonSerializer.Serialize(parameters) + }; + + var completion = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + lock (_lock) + { + _pendingRequests[requestId] = completion; + } + + await _context.SendRequestAsync(this, request).ConfigureAwait(false); + return await completion.Task.ConfigureAwait(false); + } + + protected internal async ValueTask PublishEvent(Event @event) + { + var rpcEvent = @event.ToRpcEvent(); + await _context.PublishEventAsync(rpcEvent).ConfigureAwait(false); + } + + protected internal virtual Task HandleRequest(RpcRequest request) => Task.FromResult(new RpcResponse { Error = "Not implemented" }); + + protected internal virtual Task HandleEvent(Event @event) => Task.CompletedTask; +} diff --git a/dotnet/src/Microsoft.AI.Agents.Worker.Client/AgentContext.cs b/dotnet/src/Microsoft.AI.Agents.Worker.Client/AgentContext.cs new file mode 100644 index 000000000..426cca538 --- /dev/null +++ b/dotnet/src/Microsoft.AI.Agents.Worker.Client/AgentContext.cs @@ -0,0 +1,30 @@ +using Agents; +using RpcEvent = Agents.Event; +using Microsoft.Extensions.Logging; + +namespace Microsoft.AI.Agents.Worker.Client; + +internal sealed class AgentContext(AgentId agentId, AgentWorkerRuntime runtime, ILogger logger) : IAgentContext +{ + private readonly AgentWorkerRuntime _runtime = runtime; + + public AgentId AgentId { get; } = agentId; + public ILogger Logger { get; } = logger; + public AgentBase? AgentInstance { get; set; } + + public async ValueTask SendResponseAsync(RpcRequest request, RpcResponse response) + { + response.RequestId = request.RequestId; + await _runtime.SendResponse(response); + } + + public async ValueTask SendRequestAsync(AgentBase agent, RpcRequest request) + { + await _runtime.SendRequest(agent, request); + } + + public async ValueTask PublishEventAsync(RpcEvent @event) + { + await _runtime.PublishEvent(@event); + } +} diff --git a/dotnet/src/Microsoft.AI.Agents.Worker.Client/AgentId.cs b/dotnet/src/Microsoft.AI.Agents.Worker.Client/AgentId.cs new file mode 100644 index 000000000..7fd59fde3 --- /dev/null +++ b/dotnet/src/Microsoft.AI.Agents.Worker.Client/AgentId.cs @@ -0,0 +1,15 @@ +using RpcAgentId = Agents.AgentId; + +namespace Microsoft.AI.Agents.Worker.Client; + +public sealed record class AgentId(string Name, string Namespace) +{ + public static implicit operator RpcAgentId(AgentId agentId) => new() + { + Name = agentId.Name, + Namespace = agentId.Namespace + }; + + public static implicit operator AgentId(RpcAgentId agentId) => new(agentId.Name, agentId.Namespace); + public override string ToString() => $"{Name}/{Namespace}"; +} diff --git a/dotnet/src/Microsoft.AI.Agents.Worker.Client/AgentWorkerRuntime.cs b/dotnet/src/Microsoft.AI.Agents.Worker.Client/AgentWorkerRuntime.cs new file mode 100644 index 000000000..7d513e87f --- /dev/null +++ b/dotnet/src/Microsoft.AI.Agents.Worker.Client/AgentWorkerRuntime.cs @@ -0,0 +1,243 @@ +using Agents; +using Grpc.Core; +using Microsoft.Extensions.Hosting; +using System.Collections.Concurrent; +using RpcEvent = Agents.Event; +using Microsoft.Extensions.Logging; +using Microsoft.Extensions.DependencyInjection; + +namespace Microsoft.AI.Agents.Worker.Client; + +public static class HostBuilderExtensions +{ + public static AgentApplicationBuilder AddAgentWorker(this IHostApplicationBuilder builder, string agentServiceAddress) + { + builder.Services.AddGrpcClient(options => options.Address = new Uri(agentServiceAddress)); + builder.Services.AddSingleton(); + builder.Services.AddSingleton(sp => sp.GetRequiredService()); + return new AgentApplicationBuilder(builder); + } +} + +public sealed class AgentApplicationBuilder(IHostApplicationBuilder builder) +{ + public AgentApplicationBuilder AddAgent(string typeName) where TAgent : AgentBase + { + builder.Services.AddKeyedSingleton("AgentTypes", (sp, key) => Tuple.Create(typeName, typeof(TAgent))); + return this; + } +} + +public sealed class AgentWorkerRuntime( + AgentRpc.AgentRpcClient client, + IHostApplicationLifetime hostApplicationLifetime, + IServiceProvider serviceProvider, + [FromKeyedServices("AgentTypes")] IEnumerable> agentTypes, + ILogger logger) : IHostedService, IDisposable +{ + private readonly object _channelLock = new(); + private readonly ConcurrentDictionary _agentTypes = new(); + private readonly ConcurrentDictionary<(string Type, string Key), AgentBase> _agents = new(); + private readonly ConcurrentDictionary _pendingRequests = new(); + + private AsyncDuplexStreamingCall? _channel; + + private Task? _runTask; + + public void Dispose() + { + _channel?.Dispose(); + } + + private async Task RunMessagePump() + { + while (!hostApplicationLifetime.ApplicationStopping.IsCancellationRequested) + { + var channel = GetChannel(); + try + { + await foreach (var message in channel.ResponseStream.ReadAllAsync(hostApplicationLifetime.ApplicationStopping)) + { + switch (message.MessageCase) + { + case Message.MessageOneofCase.Request: + GetOrActivateAgent(message.Request.Target).ReceiveMessage(message); + break; + case Message.MessageOneofCase.Response: + if (!_pendingRequests.TryRemove(message.Response.RequestId, out var request)) + { + throw new InvalidOperationException($"Unexpected response '{message.Response}'"); + } + + message.Response.RequestId = request.OriginalRequestId; + request.Agent.ReceiveMessage(message); + break; + case Message.MessageOneofCase.Event: + foreach (var agent in _agents.Values) + { + agent.ReceiveMessage(message); + } + break; + default: + throw new InvalidOperationException($"Unexpected message '{message}'."); + } + } + } + catch (Exception ex) + { + logger.LogError(ex, "Error reading from channel."); + RecreateChannel(channel); + } + } + } + + private AgentBase GetOrActivateAgent(AgentId agentId) + { + if (!_agents.TryGetValue((agentId.Name, agentId.Namespace), out var agent)) + { + if (_agentTypes.TryGetValue(agentId.Name, out var agentType)) + { + var context = new AgentContext(agentId, this, serviceProvider.GetRequiredService>()); + agent = (AgentBase)ActivatorUtilities.CreateInstance(serviceProvider, agentType, context); + _agents.TryAdd((agentId.Name, agentId.Namespace), agent); + } + else + { + throw new InvalidOperationException($"Agent type '{agentId.Name}' is unknown."); + } + } + + return agent; + } + + private async ValueTask RegisterAgentType(string type, Type agentType) + { + if (_agentTypes.TryAdd(type, agentType)) + { + await WriteChannelAsync(new Message + { + RegisterAgentType = new RegisterAgentType + { + Type = type, + } + }); + } + } + + public async ValueTask SendResponse(RpcResponse response) + { + await WriteChannelAsync(new Message { Response = response }); + } + + public async ValueTask SendRequest(AgentBase agent, RpcRequest request) + { + var requestId = Guid.NewGuid().ToString(); + _pendingRequests[requestId] = (agent, request.RequestId); + request.RequestId = requestId; + await WriteChannelAsync(new Message { Request = request }); + } + + public async ValueTask PublishEvent(RpcEvent @event) + { + await WriteChannelAsync(new Message { Event = @event }); + } + + private async Task WriteChannelAsync(Message message) + { + var channel = GetChannel(); + try + { + await channel.RequestStream.WriteAsync(message); + } + catch (Exception ex) + { + logger.LogError(ex, "Exception writing to channel."); + RecreateChannel(channel); + } + } + + private AsyncDuplexStreamingCall GetChannel() + { + if (_channel is { } channel) + { + return channel; + } + + lock (_channelLock) + { + if (_channel is not null) + { + return _channel; + } + + return RecreateChannel(null); + } + } + + private AsyncDuplexStreamingCall RecreateChannel(AsyncDuplexStreamingCall? channel) + { + if (_channel is null || _channel == channel) + { + lock (_channelLock) + { + if (_channel is null || _channel == channel) + { + _channel?.Dispose(); + _channel = client.OpenChannel(); + } + } + } + + return _channel; + } + + public async Task StartAsync(CancellationToken cancellationToken) + { + _channel = GetChannel(); + _runTask = Start(); + + var tasks = new List(_agentTypes.Count); + foreach (var (typeName, type) in agentTypes) + { + tasks.Add(RegisterAgentType(typeName, type).AsTask()); + } + + await Task.WhenAll(tasks); + } + + internal Task Start() + { + var didSuppress = false; + if (!ExecutionContext.IsFlowSuppressed()) + { + didSuppress = true; + ExecutionContext.SuppressFlow(); + } + + try + { + return Task.Run(RunMessagePump); + } + finally + { + if (didSuppress) + { + ExecutionContext.RestoreFlow(); + } + } + } + + public async Task StopAsync(CancellationToken cancellationToken) + { + lock (_channelLock) + { + _channel?.Dispose(); + } + + if (_runTask is { } task) + { + await task; + } + } +} + diff --git a/dotnet/src/Microsoft.AI.Agents.Worker.Client/IAgentContext.cs b/dotnet/src/Microsoft.AI.Agents.Worker.Client/IAgentContext.cs new file mode 100644 index 000000000..6b6ebe257 --- /dev/null +++ b/dotnet/src/Microsoft.AI.Agents.Worker.Client/IAgentContext.cs @@ -0,0 +1,15 @@ +using Agents; +using RpcEvent = Agents.Event; +using Microsoft.Extensions.Logging; + +namespace Microsoft.AI.Agents.Worker.Client; + +public interface IAgentContext +{ + AgentId AgentId { get; } + AgentBase? AgentInstance { get; set; } + ILogger Logger { get; } + ValueTask SendResponseAsync(RpcRequest request, RpcResponse response); + ValueTask SendRequestAsync(AgentBase agent, RpcRequest request); + ValueTask PublishEventAsync(RpcEvent @event); +} diff --git a/dotnet/src/Microsoft.AI.Agents.Worker.Client/Microsoft.AI.Agents.Worker.Client.csproj b/dotnet/src/Microsoft.AI.Agents.Worker.Client/Microsoft.AI.Agents.Worker.Client.csproj new file mode 100644 index 000000000..08ea9d90c --- /dev/null +++ b/dotnet/src/Microsoft.AI.Agents.Worker.Client/Microsoft.AI.Agents.Worker.Client.csproj @@ -0,0 +1,27 @@ + + + + net8.0 + enable + enable + + + + + + + + + + + + + + + + runtime; build; native; contentfiles; analyzers; buildtransitive + all + + + + diff --git a/dotnet/src/Microsoft.AI.Agents.Worker.Server/AgentStateGrain.cs b/dotnet/src/Microsoft.AI.Agents.Worker.Server/AgentStateGrain.cs new file mode 100644 index 000000000..4f3fad31b --- /dev/null +++ b/dotnet/src/Microsoft.AI.Agents.Worker.Server/AgentStateGrain.cs @@ -0,0 +1,22 @@ +using Orleans.Runtime; + +namespace Microsoft.AI.Agents.Worker; + +internal sealed class AgentStateGrain([PersistentState("state", "agent-state")] IPersistentState> state) : Grain, IAgentStateGrain +{ + public ValueTask<(Dictionary State, string ETag)> ReadStateAsync() + { + return new((state.State, state.Etag)); + } + + public async ValueTask WriteStateAsync(Dictionary value, string eTag) + { + if (string.Equals(state.Etag, eTag, StringComparison.Ordinal)) + { + state.State = value; + await state.WriteStateAsync(); + } + + return state.Etag; + } +} diff --git a/dotnet/src/Microsoft.AI.Agents.Worker.Server/AgentWorkerHostingExtensions.cs b/dotnet/src/Microsoft.AI.Agents.Worker.Server/AgentWorkerHostingExtensions.cs new file mode 100644 index 000000000..67544ca75 --- /dev/null +++ b/dotnet/src/Microsoft.AI.Agents.Worker.Server/AgentWorkerHostingExtensions.cs @@ -0,0 +1,28 @@ +using Microsoft.AspNetCore.Builder; +using Microsoft.Extensions.Hosting; +using Microsoft.Extensions.DependencyInjection; +using Orleans.Serialization; + +namespace Microsoft.AI.Agents.Worker; + +public static class AgentWorkerHostingExtensions +{ + public static IHostApplicationBuilder AddAgentService(this IHostApplicationBuilder builder) + { + builder.Services.AddGrpc(); + builder.Services.AddSerializer(serializer => serializer.AddProtobufSerializer()); + + // Ensure Orleans is added before the hosted service to guarantee that it starts first. + builder.UseOrleans(); + builder.Services.AddSingleton(); + builder.Services.AddSingleton(sp => sp.GetRequiredService()); + + return builder; + } + + public static WebApplication MapAgentService(this WebApplication app) + { + app.MapGrpcService(); + return app; + } +} diff --git a/dotnet/src/Microsoft.AI.Agents.Worker.Server/AgentWorkerRegistryGrain.cs b/dotnet/src/Microsoft.AI.Agents.Worker.Server/AgentWorkerRegistryGrain.cs new file mode 100644 index 000000000..32329518b --- /dev/null +++ b/dotnet/src/Microsoft.AI.Agents.Worker.Server/AgentWorkerRegistryGrain.cs @@ -0,0 +1,150 @@ +using Agents; + +namespace Microsoft.AI.Agents.Worker; + +public sealed class AgentWorkerRegistryGrain : Grain, IAgentWorkerRegistryGrain +{ + // TODO: use persistent state for some of these or (better) extend Orleans to implement some of this natively. + private readonly Dictionary _workerStates = []; + private readonly Dictionary> _supportedAgentTypes = []; + private readonly Dictionary<(string Type, string Key), IWorkerGateway> _agentDirectory = []; + private readonly TimeSpan _agentTimeout = TimeSpan.FromMinutes(1); + + public override Task OnActivateAsync(CancellationToken cancellationToken) + { + RegisterTimer(static state => ((AgentWorkerRegistryGrain)state)!.PurgeInactiveWorkers(), this, TimeSpan.FromSeconds(30), TimeSpan.FromSeconds(30)); + return base.OnActivateAsync(cancellationToken); + } + + private Task PurgeInactiveWorkers() + { + foreach (var (worker, state) in _workerStates) + { + if (DateTimeOffset.UtcNow - state.LastSeen > _agentTimeout) + { + _workerStates.Remove(worker); + foreach (var type in state.SupportedTypes) + { + if (_supportedAgentTypes.TryGetValue(type, out var workers)) + { + workers.Remove(worker); + } + } + } + } + + return Task.CompletedTask; + } + + public ValueTask AddWorker(IWorkerGateway worker) + { + GetOrAddWorker(worker); + return ValueTask.CompletedTask; + } + + private WorkerState GetOrAddWorker(IWorkerGateway worker) + { + if (!_workerStates.TryGetValue(worker, out var workerState)) + { + workerState = _workerStates[worker] = new(); + } + + workerState.LastSeen = DateTimeOffset.UtcNow; + return workerState; + } + + public ValueTask RegisterAgentType(string type, IWorkerGateway worker) + { + if (!_supportedAgentTypes.TryGetValue(type, out var supportedAgentTypes)) + { + supportedAgentTypes = _supportedAgentTypes[type] = []; + } + + if (!supportedAgentTypes.Contains(worker)) + { + supportedAgentTypes.Add(worker); + } + + var workerState = GetOrAddWorker(worker); + workerState.SupportedTypes.Add(type); + + return ValueTask.CompletedTask; + } + + public ValueTask RemoveWorker(IWorkerGateway worker) + { + if (_workerStates.Remove(worker, out var state)) + { + foreach (var type in state.SupportedTypes) + { + if (_supportedAgentTypes.TryGetValue(type, out var workers)) + { + workers.Remove(worker); + } + } + } + + return ValueTask.CompletedTask; + } + + public ValueTask UnregisterAgentType(string type, IWorkerGateway worker) + { + if (_workerStates.TryGetValue(worker, out var state)) + { + state.SupportedTypes.Remove(type); + } + + if (_supportedAgentTypes.TryGetValue(type, out var workers)) + { + workers.Remove(worker); + } + + return ValueTask.CompletedTask; + } + + public ValueTask GetCompatibleWorker(string type) => new(GetCompatibleWorkerCore(type)); + + private IWorkerGateway? GetCompatibleWorkerCore(string type) + { + if (_supportedAgentTypes.TryGetValue(type, out var workers)) + { + // Return a random compatible worker. + return workers[Random.Shared.Next(workers.Count)]; + } + + return null; + } + + public ValueTask<(IWorkerGateway? Gateway, bool NewPlacment)> GetOrPlaceAgent(AgentId agentId) + { + bool isNewPlacement; + if (!_agentDirectory.TryGetValue((agentId.Name, agentId.Namespace), out var worker) || !_workerStates.ContainsKey(worker)) + { + worker = GetCompatibleWorkerCore(agentId.Name); + if (worker is not null) + { + // New activation. + _agentDirectory[(agentId.Name, agentId.Namespace)] = worker; + isNewPlacement = true; + } + else + { + // No activation, and failed to place. + isNewPlacement = false; + } + } + else + { + // Existing activation. + isNewPlacement = false; + } + + return new((worker, isNewPlacement)); + } + + private sealed class WorkerState + { + public HashSet SupportedTypes { get; set; } = []; + public DateTimeOffset LastSeen { get; set; } + } +} diff --git a/dotnet/src/Microsoft.AI.Agents.Worker.Server/IAgentStateGrain.cs b/dotnet/src/Microsoft.AI.Agents.Worker.Server/IAgentStateGrain.cs new file mode 100644 index 000000000..f5a10f7d3 --- /dev/null +++ b/dotnet/src/Microsoft.AI.Agents.Worker.Server/IAgentStateGrain.cs @@ -0,0 +1,7 @@ +namespace Microsoft.AI.Agents.Worker; + +internal interface IAgentStateGrain : IGrainWithStringKey +{ + ValueTask<(Dictionary State, string ETag)> ReadStateAsync(); + ValueTask WriteStateAsync(Dictionary state, string eTag); +} diff --git a/dotnet/src/Microsoft.AI.Agents.Worker.Server/IAgentWorkerRegistryGrain.cs b/dotnet/src/Microsoft.AI.Agents.Worker.Server/IAgentWorkerRegistryGrain.cs new file mode 100644 index 000000000..09fb4c805 --- /dev/null +++ b/dotnet/src/Microsoft.AI.Agents.Worker.Server/IAgentWorkerRegistryGrain.cs @@ -0,0 +1,12 @@ +using Agents; + +namespace Microsoft.AI.Agents.Worker; + +public interface IAgentWorkerRegistryGrain : IGrainWithIntegerKey +{ + ValueTask RegisterAgentType(string type, IWorkerGateway worker); + ValueTask UnregisterAgentType(string type, IWorkerGateway worker); + ValueTask AddWorker(IWorkerGateway worker); + ValueTask RemoveWorker(IWorkerGateway worker); + ValueTask<(IWorkerGateway? Gateway, bool NewPlacment)> GetOrPlaceAgent(AgentId agentId); +} diff --git a/dotnet/src/Microsoft.AI.Agents.Worker.Server/IWorkerGateway.cs b/dotnet/src/Microsoft.AI.Agents.Worker.Server/IWorkerGateway.cs new file mode 100644 index 000000000..1477f855a --- /dev/null +++ b/dotnet/src/Microsoft.AI.Agents.Worker.Server/IWorkerGateway.cs @@ -0,0 +1,10 @@ +using Agents; +using RpcEvent = Agents.Event; + +namespace Microsoft.AI.Agents.Worker; + +public interface IWorkerGateway : IGrainObserver +{ + ValueTask InvokeRequest(RpcRequest request); + ValueTask BroadcastEvent(RpcEvent evt); +} diff --git a/dotnet/src/Microsoft.AI.Agents.Worker.Server/Microsoft.AI.Agents.Worker.Server.csproj b/dotnet/src/Microsoft.AI.Agents.Worker.Server/Microsoft.AI.Agents.Worker.Server.csproj new file mode 100644 index 000000000..91fcb44ab --- /dev/null +++ b/dotnet/src/Microsoft.AI.Agents.Worker.Server/Microsoft.AI.Agents.Worker.Server.csproj @@ -0,0 +1,25 @@ + + + + net8.0 + enable + enable + + + + + + + + + + + + + + + + + + + diff --git a/dotnet/src/Microsoft.AI.Agents.Worker.Server/WorkerGateway.cs b/dotnet/src/Microsoft.AI.Agents.Worker.Server/WorkerGateway.cs new file mode 100644 index 000000000..bac646c93 --- /dev/null +++ b/dotnet/src/Microsoft.AI.Agents.Worker.Server/WorkerGateway.cs @@ -0,0 +1,273 @@ +using Grpc.Core; +using Microsoft.Extensions.Hosting; +using Orleans.Runtime; +using Agents; +using Orleans.Streams; +using Event = Microsoft.AI.Agents.Abstractions.Event; +using RpcEvent = Agents.Event; +using Microsoft.Extensions.Logging; +using System.Collections.Concurrent; +using System.Text.Json; + +namespace Microsoft.AI.Agents.Worker; + +internal sealed class WorkerGateway : BackgroundService, IWorkerGateway +{ + private static readonly TimeSpan s_agentResponseTimeout = TimeSpan.FromSeconds(30); + + private readonly ILogger _logger; + private readonly IClusterClient _clusterClient; + private readonly IAgentWorkerRegistryGrain _gatewayRegistry; + private readonly IWorkerGateway _reference; + + // The local mapping of agents to worker processes. + private readonly ConcurrentDictionary _workers = new(); + + // The agents supported by each worker process. + private readonly ConcurrentDictionary> _supportedAgentTypes = []; + + // The mapping from agent id to worker process. + private readonly ConcurrentDictionary<(string Type, string Key), WorkerProcessConnection> _agentDirectory = new(); + + // RPC + private readonly ConcurrentDictionary<(WorkerProcessConnection, string), TaskCompletionSource> _pendingRequests = new(); + + public WorkerGateway(IClusterClient clusterClient, ILogger logger) + { + _logger = logger; + _clusterClient = clusterClient; + _reference = clusterClient.CreateObjectReference(this); + _gatewayRegistry = clusterClient.GetGrain(0); + } + + public async ValueTask BroadcastEvent(RpcEvent evt) + { + var tasks = new List(_agentDirectory.Count); + foreach (var (_, connection) in _agentDirectory) + { + tasks.Add(connection.SendMessage(new Message { Event = evt })); + } + + await Task.WhenAll(tasks); + } + + public async ValueTask InvokeRequest(RpcRequest request) + { + (string Type, string Key) agentId = (request.Target.Name, request.Target.Namespace); + if (!_agentDirectory.TryGetValue(agentId, out var connection) || connection.Completion.IsCompleted) + { + // Activate the agent on a compatible worker process. + if (_supportedAgentTypes.TryGetValue(request.Target.Name, out var workers)) + { + connection = workers[Random.Shared.Next(workers.Count)]; + _agentDirectory[agentId] = connection; + } + else + { + return new(new RpcResponse { Error = "Agent not found." }); + } + } + + // Proxy the request to the agent. + var originalRequestId = request.RequestId; + var newRequestId = Guid.NewGuid().ToString(); + var completion = _pendingRequests[(connection, newRequestId)] = new(TaskCreationOptions.RunContinuationsAsynchronously); + request.RequestId = newRequestId; + await connection.ResponseStream.WriteAsync(new Message { Request = request }); + + // Wait for the response and send it back to the caller. + var response = await completion.Task.WaitAsync(s_agentResponseTimeout); + response.RequestId = originalRequestId; + return response; + } + + void DispatchResponse(WorkerProcessConnection connection, RpcResponse response) + { + if (!_pendingRequests.TryRemove((connection, response.RequestId), out var completion)) + { + _logger.LogWarning("Received response for unknown request."); + return; + } + + // Complete the request. + completion.SetResult(response); + } + + protected override async Task ExecuteAsync(CancellationToken stoppingToken) + { + while (!stoppingToken.IsCancellationRequested) + { + try + { + await _gatewayRegistry.AddWorker(_reference); + } + catch (Exception exception) + { + _logger.LogWarning(exception, "Error adding worker to registry."); + } + + await Task.Delay(TimeSpan.FromSeconds(15), stoppingToken); + } + + try + { + await _gatewayRegistry.RemoveWorker(_reference); + } + catch (Exception exception) + { + _logger.LogWarning(exception, "Error removing worker from registry."); + } + } + + internal async Task OnReceivedMessageAsync(WorkerProcessConnection connection, Message message) + { + _logger.LogInformation("Received message {Message} from connection {Connection}.", message, connection); + switch (message.MessageCase) + { + case Message.MessageOneofCase.Request: + await DispatchRequestAsync(connection, message.Request); + break; + case Message.MessageOneofCase.Response: + DispatchResponse(connection, message.Response); + break; + case Message.MessageOneofCase.Event: + await DispatchEventAsync(message.Event); + break; + case Message.MessageOneofCase.RegisterAgentType: + await RegisterAgentTypeAsync(connection, message.RegisterAgentType); + break; + default: + throw new InvalidOperationException($"Unknown message type for message '{message}'."); + }; + } + + async ValueTask RegisterAgentTypeAsync(WorkerProcessConnection connection, RegisterAgentType msg) + { + connection.AddSupportedType(msg.Type); + _supportedAgentTypes.GetOrAdd(msg.Type, _ => []).Add(connection); + + await _clusterClient.GetGrain(0).RegisterAgentType(msg.Type, _reference); + } + + async ValueTask DispatchEventAsync(RpcEvent evt) + { + var topic = _clusterClient.GetStreamProvider("agents").GetStream(StreamId.Create(evt.Namespace, evt.Type)); + await topic.OnNextAsync(evt.ToEvent()); + } + + async ValueTask DispatchRequestAsync(WorkerProcessConnection connection, RpcRequest request) + { + var requestId = request.RequestId; + if (request.Target is null) + { + throw new InvalidOperationException($"Request message is missing a target. Message: '{request}'."); + } + + if (string.Equals("runtime", request.Target.Name, StringComparison.Ordinal)) + { + if (string.Equals("subscribe", request.Method)) + { + await InvokeRequestDelegate(connection, request, async (_) => + { + await SubscribeToTopic(connection, request); + return new RpcResponse { Result = "Ok" }; + }); + + return; + } + } + else + { + await InvokeRequestDelegate(connection, request, async request => + { + var (gateway, isPlacement) = await _gatewayRegistry.GetOrPlaceAgent(request.Target); + if (gateway is null) + { + return new RpcResponse { Error = "Agent not found and no compatible gateways were found." }; + } + + if (isPlacement) + { + // Activate the worker: load state + // TODO + } + + // Forward the message to the gateway and return the result. + return await gateway.InvokeRequest(request); + }); + } + } + + private static async Task InvokeRequestDelegate(WorkerProcessConnection connection, RpcRequest request, Func> func) + { + try + { + var response = await func(request); + response.RequestId = request.RequestId; + await connection.ResponseStream.WriteAsync(new Message { Response = response }); + } + catch (Exception ex) + { + await connection.ResponseStream.WriteAsync(new Message { Response = new RpcResponse { RequestId = request.RequestId, Error = ex.Message } }); + } + } + + private async ValueTask SubscribeToTopic(WorkerProcessConnection connection, RpcRequest request) + { + // Subscribe to a topic + var parameters = JsonSerializer.Deserialize>(request.Data) + ?? throw new ArgumentException($"Request data does not contain required payload format: {{\"namespace\": \"string\", \"type\": \"string\"}}."); + var ns = parameters["namespace"]; + var type = parameters["type"]; + var topic = _clusterClient.GetStreamProvider("agents").GetStream(StreamId.Create(ns: type, key: ns)); + await topic.SubscribeAsync(OnNextAsync); + return; + + async Task OnNextAsync(IList> items) + { + foreach (var item in items) + { + var evt = item.Item.ToRpcEvent(); + evt.Namespace = ns; + evt.Type = evt.Type; + var payload = new Dictionary + { + ["sequenceId"] = item.Token.SequenceNumber.ToString(), + ["eventIdx"] = item.Token.EventIndex.ToString() + }; + evt.Data = JsonSerializer.Serialize(payload); + await connection.ResponseStream.WriteAsync(new Message { Event = evt }); + } + } + } + + internal Task ConnectToWorkerProcess(IAsyncStreamReader requestStream, IServerStreamWriter responseStream, ServerCallContext context) + { + _logger.LogInformation("Received new connection from {Peer}.", context.Peer); + var workerProcess = new WorkerProcessConnection(this, requestStream, responseStream, context); + _workers[workerProcess] = workerProcess; + return workerProcess.Completion; + } + + internal void OnRemoveWorkerProcess(WorkerProcessConnection workerProcess) + { + _workers.TryRemove(workerProcess, out _); + var types = workerProcess.GetSupportedTypes(); + foreach (var type in types) + { + if (_supportedAgentTypes.TryGetValue(type, out var supported)) + { + supported.Remove(workerProcess); + } + } + + // Any agents activated on that worker are also gone. + foreach (var pair in _agentDirectory) + { + if (pair.Value == workerProcess) + { + ((IDictionary<(string Type, string Key), WorkerProcessConnection>)_agentDirectory).Remove(pair); + } + } + } +} diff --git a/dotnet/src/Microsoft.AI.Agents.Worker.Server/WorkerGatewayService.cs b/dotnet/src/Microsoft.AI.Agents.Worker.Server/WorkerGatewayService.cs new file mode 100644 index 000000000..934289a78 --- /dev/null +++ b/dotnet/src/Microsoft.AI.Agents.Worker.Server/WorkerGatewayService.cs @@ -0,0 +1,13 @@ +using Grpc.Core; +using Agents; + +namespace Microsoft.AI.Agents.Worker; + +// gRPC service which handles communication between the agent worker and the cluster. +internal sealed class WorkerGatewayService(WorkerGateway agentWorker) : AgentRpc.AgentRpcBase +{ + public override async Task OpenChannel(IAsyncStreamReader requestStream, IServerStreamWriter responseStream, ServerCallContext context) + { + await agentWorker.ConnectToWorkerProcess(requestStream, responseStream, context); + } +} diff --git a/dotnet/src/Microsoft.AI.Agents.Worker.Server/WorkerProcessConnection.cs b/dotnet/src/Microsoft.AI.Agents.Worker.Server/WorkerProcessConnection.cs new file mode 100644 index 000000000..ea1540053 --- /dev/null +++ b/dotnet/src/Microsoft.AI.Agents.Worker.Server/WorkerProcessConnection.cs @@ -0,0 +1,98 @@ +using Grpc.Core; +using Agents; + +namespace Microsoft.AI.Agents.Worker; + +internal sealed class WorkerProcessConnection : IAsyncDisposable +{ + private static long s_nextConnectionId; + private readonly string _connectionId = Interlocked.Increment(ref s_nextConnectionId).ToString(); + private readonly object _lock = new(); + private readonly HashSet _supportedTypes = []; + private readonly WorkerGateway _gateway; + private readonly CancellationTokenSource _shutdownCancellationToken = new(); + + public WorkerProcessConnection(WorkerGateway agentWorker, IAsyncStreamReader requestStream, IServerStreamWriter responseStream, ServerCallContext context) + { + _gateway = agentWorker; + RequestStream = requestStream; + ResponseStream = responseStream; + ServerCallContext = context; + Completion = Start(); + } + + public IAsyncStreamReader RequestStream { get; } + public IServerStreamWriter ResponseStream { get; } + public ServerCallContext ServerCallContext { get; } + + public void AddSupportedType(string type) + { + lock (_lock) + { + _supportedTypes.Add(type); + } + } + + public HashSet GetSupportedTypes() + { + lock (_lock) + { + return new HashSet(_supportedTypes); + } + } + + public async Task SendMessage(Message message) + { + await ResponseStream.WriteAsync(message); + } + + public Task Completion { get; } + + private Task Start() + { + var didSuppress = false; + if (!ExecutionContext.IsFlowSuppressed()) + { + didSuppress = true; + ExecutionContext.SuppressFlow(); + } + + try + { + return Task.Run(Run); + } + finally + { + if (didSuppress) + { + ExecutionContext.RestoreFlow(); + } + } + } + + public async Task Run() + { + await Task.CompletedTask.ConfigureAwait(ConfigureAwaitOptions.ForceYielding); + try + { + await foreach (var message in RequestStream.ReadAllAsync(_shutdownCancellationToken.Token)) + { + + // Fire and forget + _gateway.OnReceivedMessageAsync(this, message).Ignore(); + } + } + finally + { + _gateway.OnRemoveWorkerProcess(this); + } + } + + public async ValueTask DisposeAsync() + { + _shutdownCancellationToken.Cancel(); + await Completion.ConfigureAwait(ConfigureAwaitOptions.SuppressThrowing); + } + + public override string ToString() => $"Connection-{_connectionId}"; +} diff --git a/dotnet/src/Microsoft.AI.Agents/Abstractions/Event.cs b/dotnet/src/Microsoft.AI.Agents/Abstractions/Event.cs index 5924ca307..a063d5f33 100644 --- a/dotnet/src/Microsoft.AI.Agents/Abstractions/Event.cs +++ b/dotnet/src/Microsoft.AI.Agents/Abstractions/Event.cs @@ -6,6 +6,7 @@ namespace Microsoft.AI.Agents.Abstractions; public class Event { public required Dictionary Data { get; set; } + public required string Namespace { get; set; } public required string Type { get; set; } public string Subject { get; set; } = ""; } diff --git a/dotnet/src/Microsoft.AI.Agents/Abstractions/IAgent.cs b/dotnet/src/Microsoft.AI.Agents/Abstractions/IAgent.cs index d05c4c37a..157daa9d5 100644 --- a/dotnet/src/Microsoft.AI.Agents/Abstractions/IAgent.cs +++ b/dotnet/src/Microsoft.AI.Agents/Abstractions/IAgent.cs @@ -3,5 +3,5 @@ namespace Microsoft.AI.Agents.Abstractions; public interface IAgent { Task HandleEvent(Event item); - Task PublishEvent(string ns, string id, Event item); -} \ No newline at end of file + Task PublishEvent(Event item); +} diff --git a/dotnet/src/Shared/RpcEventExtensions.cs b/dotnet/src/Shared/RpcEventExtensions.cs new file mode 100644 index 000000000..330a14970 --- /dev/null +++ b/dotnet/src/Shared/RpcEventExtensions.cs @@ -0,0 +1,42 @@ +using System.Text.Json; +using Event = Microsoft.AI.Agents.Abstractions.Event; +using RpcEvent = Agents.Event; + +namespace Microsoft.AI.Agents.Worker; + +public static class RpcEventExtensions +{ + public static RpcEvent ToRpcEvent(this Event input) + { + var result = new RpcEvent + { + Namespace = input.Namespace, + Type = input.Type, + }; + + if (input.Data is not null) + { + result.Data = JsonSerializer.Serialize(input.Data); + } + + return result; + } + + public static Event ToEvent(this RpcEvent input) + { + var result = new Event + { + Type = input.Type, + Subject = input.Namespace, + Namespace = input.Namespace, + Data = [] + }; + + if (input.Data is not null) + { + result.Data = JsonSerializer.Deserialize>(input.Data)!; + } + + return result; + } +} diff --git a/protos/agent_worker.proto b/protos/agent_worker.proto new file mode 100644 index 000000000..a71da103f --- /dev/null +++ b/protos/agent_worker.proto @@ -0,0 +1,49 @@ +syntax = "proto3"; + +package agents; + +message AgentId { + string name = 1; + string namespace = 2; +} + +message RpcRequest { + string request_id = 1; + AgentId source = 2; + AgentId target = 3; + string method = 4; + string data = 5; + map metadata = 6; +} + +message RpcResponse { + string request_id = 1; + string result = 2; + string error = 3; + map metadata = 4; +} + +message Event { + string namespace = 1; + string type = 2; + string data = 3; + map metadata = 4; +} + +message RegisterAgentType { + string type = 1; +} + +service AgentRpc { + rpc OpenChannel (stream Message) returns (stream Message); +} + +message Message { + oneof message { + RpcRequest request = 1; + RpcResponse response = 2; + Event event = 3; + RegisterAgentType registerAgentType = 4; + } +} + diff --git a/python/.gitignore b/python/.gitignore index 82455027b..640f8babf 100644 --- a/python/.gitignore +++ b/python/.gitignore @@ -163,3 +163,6 @@ cython_debug/ /docs/src/reference .DS_Store + +# Generated proto files +src/agnext/worker/protos/agent* \ No newline at end of file diff --git a/python/.vscode/launch.json b/python/.vscode/launch.json new file mode 100644 index 000000000..969d73695 --- /dev/null +++ b/python/.vscode/launch.json @@ -0,0 +1,16 @@ +{ + // Use IntelliSense to learn about possible attributes. + // Hover to view descriptions of existing attributes. + // For more information, visit: https://go.microsoft.com/fwlink/?linkid=830387 + "version": "0.2.0", + "configurations": [ + { + "name": "Python Debugger: Current File", + "type": "debugpy", + "request": "launch", + "program": "${file}", + "console": "integratedTerminal", + "justMyCode": false + } + ] +} \ No newline at end of file diff --git a/python/docs/src/conf.py b/python/docs/src/conf.py index f0e1a7a63..6cbbbb9c6 100644 --- a/python/docs/src/conf.py +++ b/python/docs/src/conf.py @@ -28,6 +28,7 @@ apidoc_template_dir = '_apidoc_templates' apidoc_separate_modules = True apidoc_extra_args = ["--no-toc"] napoleon_custom_sections = [('Returns', 'params_style')] +apidoc_excluded_paths = ["./worker/protos/"] templates_path = [] exclude_patterns = ["reference/agnext.rst"] diff --git a/python/docs/src/core-concepts/agent.md b/python/docs/src/core-concepts/agent.md index d224735d9..c43ef9f97 100644 --- a/python/docs/src/core-concepts/agent.md +++ b/python/docs/src/core-concepts/agent.md @@ -15,6 +15,10 @@ Generally, messages are one of: Messages are purely data, and should not contain any logic. +```{tip} +It is *strongly* recommended that messages are Pydantic models. This allows for easy serialization and deserialization of messages, and provides a clear schema for the message. +``` +