// Copyright Epic Games, Inc. All Rights Reserved.
using System.Diagnostics;
using EpicGames.Core;
using EpicGames.Horde.Agents.Leases;
using Google.Protobuf;
using Google.Protobuf.WellKnownTypes;
using HordeAgent.Services;
using HordeCommon.Rpc;
using HordeCommon.Rpc.Messages;
using Microsoft.Extensions.Logging;
using OpenTelemetry.Trace;
using ByteString = Google.Protobuf.ByteString;
namespace HordeAgent.Leases
{
///
/// Handles execution of a specific lease type
///
abstract class LeaseHandler : IDisposable
{
///
/// Identifier for this lease
///
public LeaseId Id { get; }
///
/// The RPC lease state
///
public RpcLease RpcLease { get; set; }
///
/// Payload from the lease
///
public Any RpcPayload { get; }
///
/// Result from executing the lease
///
public Task Result { get; private set; }
///
/// Whether the lease has been cancelled
///
public bool IsCancelled => _cancellationSource.IsCancellationRequested;
///
/// Reason for cancellation
///
public string CancellationReason => _cancellationReason ?? "unknown";
readonly CancellationTokenSource _cancellationSource = new CancellationTokenSource();
string? _cancellationReason;
///
/// Constructor
///
protected LeaseHandler(RpcLease rpcLease)
{
Id = rpcLease.Id;
RpcLease = rpcLease;
RpcPayload = rpcLease.Payload;
Result = Task.FromException(new InvalidOperationException("Lease has not been started"));
}
///
public void Dispose()
{
Dispose(disposing: true);
GC.SuppressFinalize(this);
}
///
/// Overridable dispose method
///
protected virtual void Dispose(bool disposing)
{
if (disposing)
{
_cancellationSource.Dispose();
}
}
///
/// Starts executing the lease
///
public void Start(ISession session, Tracer tracer, ILogger logger, LeaseLoggerFactory leaseLoggerFactory)
{
Result = Task.Run(() => HandleLeaseAsync(session, tracer, logger, leaseLoggerFactory));
}
///
/// Cancels the lease
///
public void Cancel(string reason)
{
Interlocked.CompareExchange(ref _cancellationReason, reason, null);
_cancellationSource.Cancel();
}
///
/// Handle a lease request
///
async Task HandleLeaseAsync(ISession session, Tracer tracer, ILogger logger, LeaseLoggerFactory leaseLoggerFactory)
{
using TelemetrySpan span = tracer.StartActiveSpan($"{nameof(LeaseHandler)}.{nameof(HandleLeaseAsync)}");
span.SetAttribute("horde.lease.id", RpcLease.Id.ToString());
span.SetAttribute("horde.agent.id", session.AgentId.ToString());
span.SetAttribute("horde.agent.session_id", session.SessionId.ToString());
logger.LogInformation("Handling lease {LeaseId}: {LeaseTypeUrl}", Id, RpcLease.Payload.TypeUrl);
// Get the lease outcome
LeaseResult result = LeaseResult.Failed;
try
{
span.SetAttribute("horde.lease.task", RpcPayload.TypeUrl);
result = await HandleLeasePayloadAsync(session, tracer, leaseLoggerFactory);
}
catch (OperationCanceledException) when (_cancellationSource.IsCancellationRequested)
{
logger.LogInformation("Lease {LeaseId} cancelled", Id);
}
catch (Exception ex)
{
logger.LogError(ex, "Unhandled exception while executing lease {LeaseId}: {Message}", Id, ex.Message);
span.SetStatus(Status.Error);
span.RecordException(ex);
}
// Update the state of the lease
RpcLease newRpcLease = new RpcLease(RpcLease);
if (_cancellationSource.IsCancellationRequested)
{
newRpcLease.State = RpcLeaseState.Cancelled;
newRpcLease.Outcome = RpcLeaseOutcome.Failed;
newRpcLease.Output = Google.Protobuf.ByteString.Empty;
}
else
{
newRpcLease.State = (result.Outcome == LeaseOutcome.Cancelled) ? RpcLeaseState.Cancelled : RpcLeaseState.Completed;
newRpcLease.Outcome = (RpcLeaseOutcome)result.Outcome;
newRpcLease.Output = (result.Output != null) ? ByteString.CopyFrom(result.Output) : ByteString.Empty;
}
RpcLease = newRpcLease;
logger.LogInformation("Transitioning lease {LeaseId} to {State}, outcome={Outcome}", Id, RpcLease.State, RpcLease.Outcome);
span.SetAttribute("horde.lease.state", RpcLease.State.ToString());
span.SetAttribute("horde.lease.outcome", RpcLease.Outcome.ToString());
return result;
}
///
/// Dispatch a lease payload to the appropriate handler
///
internal async Task HandleLeasePayloadAsync(ISession session, Tracer tracer, LeaseLoggerFactory leaseLoggerFactory)
{
using ILoggerFactory loggerFactory = leaseLoggerFactory.CreateLoggerFactory(Id);
ILogger leaseLogger = loggerFactory.CreateLogger(GetType());
return await ExecuteAsync(session, tracer, leaseLogger, _cancellationSource.Token);
}
///
/// Executes a lease
///
/// Result for the lease
protected abstract Task ExecuteAsync(ISession session, Tracer tracer, ILogger logger, CancellationToken cancellationToken);
///
/// Runs a child process, piping the output to the given logger
///
/// Executable to launch
/// Command line arguments for the new process
/// Environment for the new process
/// Logger for output
/// Cancellation token for the operation
/// Exit code of the process
protected static async Task RunProcessAsync(string executable, IEnumerable arguments, IReadOnlyDictionary? environment, ILogger logger, CancellationToken cancellationToken)
{
string commandLine = CommandLineArguments.Join(arguments);
logger.LogInformation("Running child process: {Executable} {CommandLine}", CommandLineArguments.Quote(executable), commandLine);
try
{
using (ManagedProcessGroup processGroup = new ManagedProcessGroup())
using (ManagedProcess process = new ManagedProcess(processGroup, executable, commandLine, null, environment, ProcessPriorityClass.Normal))
{
for (; ; )
{
string? line = await process.ReadLineAsync(cancellationToken);
if (line == null)
{
break;
}
JsonLogEvent jsonLogEvent;
if (JsonLogEvent.TryParse(line, out jsonLogEvent))
{
logger.LogJsonLogEvent(jsonLogEvent);
}
else
{
logger.LogInformation("{Line}", line);
}
}
await process.WaitForExitAsync(CancellationToken.None);
return process.ExitCode;
}
}
catch (Exception ex)
{
logger.LogError(ex, "Failed to run process: {Message}", ex.Message);
throw;
}
}
///
/// Runs a .NET assembly as a child process, piping the output to the given logger
///
/// Assembly to launch
/// Command line arguments for the new process
/// Environment for the new process
/// Whether to use a native host process
/// Logger for output
/// Cancellation token for the operation
/// Exit code of the process
protected static async Task RunDotNetProcessAsync(FileReference entryAssembly, IEnumerable arguments, IReadOnlyDictionary? environment, bool useNativeHost, ILogger logger, CancellationToken cancellationToken)
{
if (useNativeHost)
{
FileReference nativeHost = entryAssembly.ChangeExtension(OperatingSystem.IsWindows() ? ".exe" : null);
return await RunProcessAsync(nativeHost.FullName, arguments, environment, logger, cancellationToken);
}
else
{
IEnumerable allArguments = arguments.Prepend(entryAssembly.FullName);
return await RunProcessAsync("dotnet", allArguments, environment, logger, cancellationToken);
}
}
}
///
/// Implementation of for a specific lease type
///
/// Type of the lease message
abstract class LeaseHandler : LeaseHandler where T : IMessage, new()
{
protected LeaseHandler(RpcLease rpcLease)
: base(rpcLease)
{
}
///
protected override Task ExecuteAsync(ISession session, Tracer tracer, ILogger logger, CancellationToken cancellationToken)
{
return ExecuteAsync(session, Id, RpcLease.Payload.Unpack(), tracer, logger, cancellationToken);
}
///
protected abstract Task ExecuteAsync(ISession session, LeaseId leaseId, T message, Tracer tracer, ILogger logger, CancellationToken cancellationToken);
}
class DefaultLeaseHandler : LeaseHandler
{
readonly LeaseResult _result;
public DefaultLeaseHandler(RpcLease lease, LeaseResult result) : base(lease)
=> _result = result;
///
protected override Task ExecuteAsync(ISession session, Tracer tracer, ILogger logger, CancellationToken cancellationToken)
=> Task.FromResult(_result);
}
}