// Copyright Epic Games, Inc. All Rights Reserved. using System.Diagnostics; using EpicGames.Core; using EpicGames.Horde.Agents.Leases; using Google.Protobuf; using Google.Protobuf.WellKnownTypes; using HordeAgent.Services; using HordeCommon.Rpc; using HordeCommon.Rpc.Messages; using Microsoft.Extensions.Logging; using OpenTelemetry.Trace; using ByteString = Google.Protobuf.ByteString; namespace HordeAgent.Leases { /// /// Handles execution of a specific lease type /// abstract class LeaseHandler : IDisposable { /// /// Identifier for this lease /// public LeaseId Id { get; } /// /// The RPC lease state /// public RpcLease RpcLease { get; set; } /// /// Payload from the lease /// public Any RpcPayload { get; } /// /// Result from executing the lease /// public Task Result { get; private set; } /// /// Whether the lease has been cancelled /// public bool IsCancelled => _cancellationSource.IsCancellationRequested; /// /// Reason for cancellation /// public string CancellationReason => _cancellationReason ?? "unknown"; readonly CancellationTokenSource _cancellationSource = new CancellationTokenSource(); string? _cancellationReason; /// /// Constructor /// protected LeaseHandler(RpcLease rpcLease) { Id = rpcLease.Id; RpcLease = rpcLease; RpcPayload = rpcLease.Payload; Result = Task.FromException(new InvalidOperationException("Lease has not been started")); } /// public void Dispose() { Dispose(disposing: true); GC.SuppressFinalize(this); } /// /// Overridable dispose method /// protected virtual void Dispose(bool disposing) { if (disposing) { _cancellationSource.Dispose(); } } /// /// Starts executing the lease /// public void Start(ISession session, Tracer tracer, ILogger logger, LeaseLoggerFactory leaseLoggerFactory) { Result = Task.Run(() => HandleLeaseAsync(session, tracer, logger, leaseLoggerFactory)); } /// /// Cancels the lease /// public void Cancel(string reason) { Interlocked.CompareExchange(ref _cancellationReason, reason, null); _cancellationSource.Cancel(); } /// /// Handle a lease request /// async Task HandleLeaseAsync(ISession session, Tracer tracer, ILogger logger, LeaseLoggerFactory leaseLoggerFactory) { using TelemetrySpan span = tracer.StartActiveSpan($"{nameof(LeaseHandler)}.{nameof(HandleLeaseAsync)}"); span.SetAttribute("horde.lease.id", RpcLease.Id.ToString()); span.SetAttribute("horde.agent.id", session.AgentId.ToString()); span.SetAttribute("horde.agent.session_id", session.SessionId.ToString()); logger.LogInformation("Handling lease {LeaseId}: {LeaseTypeUrl}", Id, RpcLease.Payload.TypeUrl); // Get the lease outcome LeaseResult result = LeaseResult.Failed; try { span.SetAttribute("horde.lease.task", RpcPayload.TypeUrl); result = await HandleLeasePayloadAsync(session, tracer, leaseLoggerFactory); } catch (OperationCanceledException) when (_cancellationSource.IsCancellationRequested) { logger.LogInformation("Lease {LeaseId} cancelled", Id); } catch (Exception ex) { logger.LogError(ex, "Unhandled exception while executing lease {LeaseId}: {Message}", Id, ex.Message); span.SetStatus(Status.Error); span.RecordException(ex); } // Update the state of the lease RpcLease newRpcLease = new RpcLease(RpcLease); if (_cancellationSource.IsCancellationRequested) { newRpcLease.State = RpcLeaseState.Cancelled; newRpcLease.Outcome = RpcLeaseOutcome.Failed; newRpcLease.Output = Google.Protobuf.ByteString.Empty; } else { newRpcLease.State = (result.Outcome == LeaseOutcome.Cancelled) ? RpcLeaseState.Cancelled : RpcLeaseState.Completed; newRpcLease.Outcome = (RpcLeaseOutcome)result.Outcome; newRpcLease.Output = (result.Output != null) ? ByteString.CopyFrom(result.Output) : ByteString.Empty; } RpcLease = newRpcLease; logger.LogInformation("Transitioning lease {LeaseId} to {State}, outcome={Outcome}", Id, RpcLease.State, RpcLease.Outcome); span.SetAttribute("horde.lease.state", RpcLease.State.ToString()); span.SetAttribute("horde.lease.outcome", RpcLease.Outcome.ToString()); return result; } /// /// Dispatch a lease payload to the appropriate handler /// internal async Task HandleLeasePayloadAsync(ISession session, Tracer tracer, LeaseLoggerFactory leaseLoggerFactory) { using ILoggerFactory loggerFactory = leaseLoggerFactory.CreateLoggerFactory(Id); ILogger leaseLogger = loggerFactory.CreateLogger(GetType()); return await ExecuteAsync(session, tracer, leaseLogger, _cancellationSource.Token); } /// /// Executes a lease /// /// Result for the lease protected abstract Task ExecuteAsync(ISession session, Tracer tracer, ILogger logger, CancellationToken cancellationToken); /// /// Runs a child process, piping the output to the given logger /// /// Executable to launch /// Command line arguments for the new process /// Environment for the new process /// Logger for output /// Cancellation token for the operation /// Exit code of the process protected static async Task RunProcessAsync(string executable, IEnumerable arguments, IReadOnlyDictionary? environment, ILogger logger, CancellationToken cancellationToken) { string commandLine = CommandLineArguments.Join(arguments); logger.LogInformation("Running child process: {Executable} {CommandLine}", CommandLineArguments.Quote(executable), commandLine); try { using (ManagedProcessGroup processGroup = new ManagedProcessGroup()) using (ManagedProcess process = new ManagedProcess(processGroup, executable, commandLine, null, environment, ProcessPriorityClass.Normal)) { for (; ; ) { string? line = await process.ReadLineAsync(cancellationToken); if (line == null) { break; } JsonLogEvent jsonLogEvent; if (JsonLogEvent.TryParse(line, out jsonLogEvent)) { logger.LogJsonLogEvent(jsonLogEvent); } else { logger.LogInformation("{Line}", line); } } await process.WaitForExitAsync(CancellationToken.None); return process.ExitCode; } } catch (Exception ex) { logger.LogError(ex, "Failed to run process: {Message}", ex.Message); throw; } } /// /// Runs a .NET assembly as a child process, piping the output to the given logger /// /// Assembly to launch /// Command line arguments for the new process /// Environment for the new process /// Whether to use a native host process /// Logger for output /// Cancellation token for the operation /// Exit code of the process protected static async Task RunDotNetProcessAsync(FileReference entryAssembly, IEnumerable arguments, IReadOnlyDictionary? environment, bool useNativeHost, ILogger logger, CancellationToken cancellationToken) { if (useNativeHost) { FileReference nativeHost = entryAssembly.ChangeExtension(OperatingSystem.IsWindows() ? ".exe" : null); return await RunProcessAsync(nativeHost.FullName, arguments, environment, logger, cancellationToken); } else { IEnumerable allArguments = arguments.Prepend(entryAssembly.FullName); return await RunProcessAsync("dotnet", allArguments, environment, logger, cancellationToken); } } } /// /// Implementation of for a specific lease type /// /// Type of the lease message abstract class LeaseHandler : LeaseHandler where T : IMessage, new() { protected LeaseHandler(RpcLease rpcLease) : base(rpcLease) { } /// protected override Task ExecuteAsync(ISession session, Tracer tracer, ILogger logger, CancellationToken cancellationToken) { return ExecuteAsync(session, Id, RpcLease.Payload.Unpack(), tracer, logger, cancellationToken); } /// protected abstract Task ExecuteAsync(ISession session, LeaseId leaseId, T message, Tracer tracer, ILogger logger, CancellationToken cancellationToken); } class DefaultLeaseHandler : LeaseHandler { readonly LeaseResult _result; public DefaultLeaseHandler(RpcLease lease, LeaseResult result) : base(lease) => _result = result; /// protected override Task ExecuteAsync(ISession session, Tracer tracer, ILogger logger, CancellationToken cancellationToken) => Task.FromResult(_result); } }