Files
2025-05-18 13:04:45 +08:00

275 lines
9.3 KiB
C#

// Copyright Epic Games, Inc. All Rights Reserved.
using System.Diagnostics;
using EpicGames.Core;
using EpicGames.Horde.Agents.Leases;
using Google.Protobuf;
using Google.Protobuf.WellKnownTypes;
using HordeAgent.Services;
using HordeCommon.Rpc;
using HordeCommon.Rpc.Messages;
using Microsoft.Extensions.Logging;
using OpenTelemetry.Trace;
using ByteString = Google.Protobuf.ByteString;
namespace HordeAgent.Leases
{
/// <summary>
/// Handles execution of a specific lease type
/// </summary>
abstract class LeaseHandler : IDisposable
{
/// <summary>
/// Identifier for this lease
/// </summary>
public LeaseId Id { get; }
/// <summary>
/// The RPC lease state
/// </summary>
public RpcLease RpcLease { get; set; }
/// <summary>
/// Payload from the lease
/// </summary>
public Any RpcPayload { get; }
/// <summary>
/// Result from executing the lease
/// </summary>
public Task<LeaseResult> Result { get; private set; }
/// <summary>
/// Whether the lease has been cancelled
/// </summary>
public bool IsCancelled => _cancellationSource.IsCancellationRequested;
/// <summary>
/// Reason for cancellation
/// </summary>
public string CancellationReason => _cancellationReason ?? "unknown";
readonly CancellationTokenSource _cancellationSource = new CancellationTokenSource();
string? _cancellationReason;
/// <summary>
/// Constructor
/// </summary>
protected LeaseHandler(RpcLease rpcLease)
{
Id = rpcLease.Id;
RpcLease = rpcLease;
RpcPayload = rpcLease.Payload;
Result = Task.FromException<LeaseResult>(new InvalidOperationException("Lease has not been started"));
}
/// <inheritdoc/>
public void Dispose()
{
Dispose(disposing: true);
GC.SuppressFinalize(this);
}
/// <summary>
/// Overridable dispose method
/// </summary>
protected virtual void Dispose(bool disposing)
{
if (disposing)
{
_cancellationSource.Dispose();
}
}
/// <summary>
/// Starts executing the lease
/// </summary>
public void Start(ISession session, Tracer tracer, ILogger logger, LeaseLoggerFactory leaseLoggerFactory)
{
Result = Task.Run(() => HandleLeaseAsync(session, tracer, logger, leaseLoggerFactory));
}
/// <summary>
/// Cancels the lease
/// </summary>
public void Cancel(string reason)
{
Interlocked.CompareExchange(ref _cancellationReason, reason, null);
_cancellationSource.Cancel();
}
/// <summary>
/// Handle a lease request
/// </summary>
async Task<LeaseResult> HandleLeaseAsync(ISession session, Tracer tracer, ILogger logger, LeaseLoggerFactory leaseLoggerFactory)
{
using TelemetrySpan span = tracer.StartActiveSpan($"{nameof(LeaseHandler)}.{nameof(HandleLeaseAsync)}");
span.SetAttribute("horde.lease.id", RpcLease.Id.ToString());
span.SetAttribute("horde.agent.id", session.AgentId.ToString());
span.SetAttribute("horde.agent.session_id", session.SessionId.ToString());
logger.LogInformation("Handling lease {LeaseId}: {LeaseTypeUrl}", Id, RpcLease.Payload.TypeUrl);
// Get the lease outcome
LeaseResult result = LeaseResult.Failed;
try
{
span.SetAttribute("horde.lease.task", RpcPayload.TypeUrl);
result = await HandleLeasePayloadAsync(session, tracer, leaseLoggerFactory);
}
catch (OperationCanceledException) when (_cancellationSource.IsCancellationRequested)
{
logger.LogInformation("Lease {LeaseId} cancelled", Id);
}
catch (Exception ex)
{
logger.LogError(ex, "Unhandled exception while executing lease {LeaseId}: {Message}", Id, ex.Message);
span.SetStatus(Status.Error);
span.RecordException(ex);
}
// Update the state of the lease
RpcLease newRpcLease = new RpcLease(RpcLease);
if (_cancellationSource.IsCancellationRequested)
{
newRpcLease.State = RpcLeaseState.Cancelled;
newRpcLease.Outcome = RpcLeaseOutcome.Failed;
newRpcLease.Output = Google.Protobuf.ByteString.Empty;
}
else
{
newRpcLease.State = (result.Outcome == LeaseOutcome.Cancelled) ? RpcLeaseState.Cancelled : RpcLeaseState.Completed;
newRpcLease.Outcome = (RpcLeaseOutcome)result.Outcome;
newRpcLease.Output = (result.Output != null) ? ByteString.CopyFrom(result.Output) : ByteString.Empty;
}
RpcLease = newRpcLease;
logger.LogInformation("Transitioning lease {LeaseId} to {State}, outcome={Outcome}", Id, RpcLease.State, RpcLease.Outcome);
span.SetAttribute("horde.lease.state", RpcLease.State.ToString());
span.SetAttribute("horde.lease.outcome", RpcLease.Outcome.ToString());
return result;
}
/// <summary>
/// Dispatch a lease payload to the appropriate handler
/// </summary>
internal async Task<LeaseResult> HandleLeasePayloadAsync(ISession session, Tracer tracer, LeaseLoggerFactory leaseLoggerFactory)
{
using ILoggerFactory loggerFactory = leaseLoggerFactory.CreateLoggerFactory(Id);
ILogger leaseLogger = loggerFactory.CreateLogger(GetType());
return await ExecuteAsync(session, tracer, leaseLogger, _cancellationSource.Token);
}
/// <summary>
/// Executes a lease
/// </summary>
/// <returns>Result for the lease</returns>
protected abstract Task<LeaseResult> ExecuteAsync(ISession session, Tracer tracer, ILogger logger, CancellationToken cancellationToken);
/// <summary>
/// Runs a child process, piping the output to the given logger
/// </summary>
/// <param name="executable">Executable to launch</param>
/// <param name="arguments">Command line arguments for the new process</param>
/// <param name="environment">Environment for the new process</param>
/// <param name="logger">Logger for output</param>
/// <param name="cancellationToken">Cancellation token for the operation</param>
/// <returns>Exit code of the process</returns>
protected static async Task<int> RunProcessAsync(string executable, IEnumerable<string> arguments, IReadOnlyDictionary<string, string>? environment, ILogger logger, CancellationToken cancellationToken)
{
string commandLine = CommandLineArguments.Join(arguments);
logger.LogInformation("Running child process: {Executable} {CommandLine}", CommandLineArguments.Quote(executable), commandLine);
try
{
using (ManagedProcessGroup processGroup = new ManagedProcessGroup())
using (ManagedProcess process = new ManagedProcess(processGroup, executable, commandLine, null, environment, ProcessPriorityClass.Normal))
{
for (; ; )
{
string? line = await process.ReadLineAsync(cancellationToken);
if (line == null)
{
break;
}
JsonLogEvent jsonLogEvent;
if (JsonLogEvent.TryParse(line, out jsonLogEvent))
{
logger.LogJsonLogEvent(jsonLogEvent);
}
else
{
logger.LogInformation("{Line}", line);
}
}
await process.WaitForExitAsync(CancellationToken.None);
return process.ExitCode;
}
}
catch (Exception ex)
{
logger.LogError(ex, "Failed to run process: {Message}", ex.Message);
throw;
}
}
/// <summary>
/// Runs a .NET assembly as a child process, piping the output to the given logger
/// </summary>
/// <param name="entryAssembly">Assembly to launch</param>
/// <param name="arguments">Command line arguments for the new process</param>
/// <param name="environment">Environment for the new process</param>
/// <param name="useNativeHost">Whether to use a native host process</param>
/// <param name="logger">Logger for output</param>
/// <param name="cancellationToken">Cancellation token for the operation</param>
/// <returns>Exit code of the process</returns>
protected static async Task<int> RunDotNetProcessAsync(FileReference entryAssembly, IEnumerable<string> arguments, IReadOnlyDictionary<string, string>? environment, bool useNativeHost, ILogger logger, CancellationToken cancellationToken)
{
if (useNativeHost)
{
FileReference nativeHost = entryAssembly.ChangeExtension(OperatingSystem.IsWindows() ? ".exe" : null);
return await RunProcessAsync(nativeHost.FullName, arguments, environment, logger, cancellationToken);
}
else
{
IEnumerable<string> allArguments = arguments.Prepend(entryAssembly.FullName);
return await RunProcessAsync("dotnet", allArguments, environment, logger, cancellationToken);
}
}
}
/// <summary>
/// Implementation of <see cref="LeaseHandler"/> for a specific lease type
/// </summary>
/// <typeparam name="T">Type of the lease message</typeparam>
abstract class LeaseHandler<T> : LeaseHandler where T : IMessage<T>, new()
{
protected LeaseHandler(RpcLease rpcLease)
: base(rpcLease)
{
}
/// <inheritdoc/>
protected override Task<LeaseResult> ExecuteAsync(ISession session, Tracer tracer, ILogger logger, CancellationToken cancellationToken)
{
return ExecuteAsync(session, Id, RpcLease.Payload.Unpack<T>(), tracer, logger, cancellationToken);
}
/// <inheritdoc cref="LeaseHandler.ExecuteAsync" />
protected abstract Task<LeaseResult> ExecuteAsync(ISession session, LeaseId leaseId, T message, Tracer tracer, ILogger logger, CancellationToken cancellationToken);
}
class DefaultLeaseHandler : LeaseHandler
{
readonly LeaseResult _result;
public DefaultLeaseHandler(RpcLease lease, LeaseResult result) : base(lease)
=> _result = result;
/// <inheritdoc/>
protected override Task<LeaseResult> ExecuteAsync(ISession session, Tracer tracer, ILogger logger, CancellationToken cancellationToken)
=> Task.FromResult(_result);
}
}