// Copyright Epic Games, Inc. All Rights Reserved.
using System.Diagnostics;
using System.Text;
using EpicGames.Core;
using Google.Protobuf.WellKnownTypes;
using Grpc.Core;
using HordeAgent.Services;
using HordeCommon.Rpc;
using HordeCommon.Rpc.Messages;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using OpenTelemetry.Trace;
namespace HordeAgent.Leases
{
///
/// Implements the message handling loop for an agent. Runs asynchronously until disposed.
///
class LeaseManager
{
///
/// Number of leases completed
///
public int NumLeasesCompleted { get; private set; }
///
/// Delegate for lease active events
///
public delegate void LeaseEvent(RpcLease lease, LeaseResult? result = null);
///
/// Event triggered when a lease is accepted and started
///
public event LeaseEvent? OnLeaseStarted;
///
/// Event triggered when a lease is finished with an outcome
///
public event LeaseEvent? OnLeaseFinished;
///
/// List of pool IDs this session is a member of
///
public IReadOnlyList PoolIds { get; private set; } = new List();
///
/// Whether to terminate session after at least one lease has finished
/// Useful for shutting down as gracefully as possible without interrupting the current lease executing.
///
public bool TerminateSessionAfterLease { get; set; }
///
/// Object used for controlling access to the access tokens and active sessions list
///
readonly object _lockObject = new object();
///
/// The list of active leases.
///
readonly List _activeLeases = new List();
///
/// Whether the agent is currently in an unhealthy state
///
readonly bool _unhealthy = false;
///
/// Result from executing this session
///
SessionResult? _sessionResult;
///
/// Task completion source used to trigger the background thread to update the leases. Must take a lock on LockObject before
///
readonly AsyncEvent _updateLeasesEvent = new AsyncEvent();
///
/// Number of times UpdateSession has failed
///
int _updateSessionFailures;
///
/// How long to wait before trying to reacquire a new connection
/// Exposed as internal to ease testing. Using a lower delay can speed up tests.
///
internal TimeSpan _rpcConnectionRetryDelay = TimeSpan.FromSeconds(5);
readonly ISession _session;
readonly ICapabilitiesService _capabilitiesService;
readonly StatusService _statusService;
readonly ISystemMetrics _systemMetrics;
readonly Dictionary _typeUrlToLeaseHandler;
readonly LeaseLoggerFactory _leaseLoggerFactory;
readonly IOptionsMonitor _settings;
readonly Tracer _tracer;
readonly ILogger _logger;
RpcAgentCapabilities? _capabilities;
public LeaseManager(
ISession session,
ICapabilitiesService capabilitiesService,
StatusService statusService,
ISystemMetrics systemMetrics,
IEnumerable leaseHandlerFactories,
LeaseLoggerFactory leaseLoggerFactory,
IOptionsMonitor settings,
Tracer tracer,
ILogger logger)
{
_session = session;
_capabilitiesService = capabilitiesService;
_statusService = statusService;
_systemMetrics = systemMetrics;
_typeUrlToLeaseHandler = leaseHandlerFactories.ToDictionary(x => x.LeaseType, x => x);
_leaseLoggerFactory = leaseLoggerFactory;
_settings = settings;
_tracer = tracer;
_logger = logger;
}
public LeaseManager(ISession session, IServiceProvider serviceProvider)
: this(session,
serviceProvider.GetRequiredService(),
serviceProvider.GetRequiredService(),
serviceProvider.GetRequiredService(),
serviceProvider.GetRequiredService>(),
serviceProvider.GetRequiredService(),
serviceProvider.GetRequiredService>(),
serviceProvider.GetRequiredService(),
serviceProvider.GetRequiredService>())
{
}
public List GetActiveLeases()
{
lock (_lockObject)
{
return _activeLeases.Select(x => x.RpcLease).ToList();
}
}
///
/// Write a termination signal file to disk
/// Used to communicate an impending termination of workload. It does require workload to be aware of the file and also act on it.
///
/// Path to signal file
/// Reason for termination
/// Timestamp when termination takes place
/// Time left to live before termination (relative to above)
/// Cancellation token
public static Task WriteTerminationSignalFileAsync(string filePath, string reason, DateTime terminateAt, TimeSpan timeToLive, CancellationToken cancellationToken)
{
StringBuilder sb = new(100);
sb.Append("v1\n");
sb.Append($"{timeToLive.TotalMilliseconds}\n");
sb.Append($"{new DateTimeOffset(terminateAt).ToUnixTimeMilliseconds()}\n");
sb.Append($"{reason}\n");
return File.WriteAllTextAsync(filePath, sb.ToString(), cancellationToken);
}
public async Task RunAsync(CancellationToken stoppingToken)
{
SessionResult result;
try
{
result = await HandleSessionAsync(stoppingToken);
}
catch (OperationCanceledException ex) when (stoppingToken.IsCancellationRequested)
{
_logger.LogInformation(ex, "Execution cancelled");
result = new SessionResult(SessionOutcome.Terminate, SessionReason.Cancelled);
}
catch (Exception ex)
{
_logger.LogError(ex, "Exception while executing session. Restarting.");
result = new SessionResult(SessionOutcome.BackOff, SessionReason.Failed);
}
while (_activeLeases.Count > 0)
{
try
{
_logger.LogInformation("Draining leases... ({NumLeases} remaining)", _activeLeases.Count);
await DrainLeasesAsync("session terminating");
}
catch (Exception ex)
{
_logger.LogError(ex, "Exception while draining leases. Agent may be in an inconsistent state.");
await Task.Delay(TimeSpan.FromSeconds(30.0), CancellationToken.None);
}
}
return result;
}
///
/// Drain and terminate any currently running leases
///
/// Human-readable reason
/// Whether to warn workload about upcoming lease termination via signal file
async Task DrainLeasesAsync(string reason, bool graceful = false)
{
if (graceful)
{
TimeSpan ttl = TimeSpan.FromSeconds(1);
await WriteTerminationSignalFileAsync(_settings.CurrentValue.GetTerminationSignalFile().FullName, reason, DateTime.UtcNow + ttl, ttl, CancellationToken.None);
await Task.Delay(TimeSpan.FromSeconds(6)); // Allow workload some time to act on signal file
}
for (int idx = 0; idx < _activeLeases.Count; idx++)
{
LeaseHandler activeLease = _activeLeases[idx];
if (activeLease.Result.IsCompleted)
{
_activeLeases.RemoveAt(idx--);
_logger.LogInformation("Removed lease {LeaseId}", activeLease.Id);
}
else
{
_logger.LogInformation("Cancelling active lease {LeaseId}", activeLease.Id);
activeLease.Cancel(reason);
}
}
while (_activeLeases.Count > 0)
{
List tasks = _activeLeases.Select(x => (Task)x.Result).ToList();
tasks.Add(Task.Delay(TimeSpan.FromMinutes(1.0)));
await Task.WhenAny(tasks);
for (int idx = 0; idx < _activeLeases.Count; idx++)
{
LeaseHandler activeLease = _activeLeases[idx];
if (activeLease.Result.IsCompleted)
{
_activeLeases.RemoveAt(idx--);
try
{
await activeLease.Result;
}
catch (OperationCanceledException)
{
}
catch (Exception ex)
{
_logger.LogError(ex, "Lease {LeaseId} threw an exception while terminating", activeLease.Id);
}
_logger.LogInformation("Lease {LeaseId} has completed", activeLease.Id);
activeLease.Dispose();
}
else
{
_logger.LogInformation("Still waiting for lease {LeaseId} to terminate...", activeLease.Id);
}
}
}
}
async Task HandleSessionAsync(CancellationToken stoppingToken)
{
HordeRpc.HordeRpcClient hordeRpc = await _session.HordeClient.CreateGrpcClientAsync(stoppingToken);
// Terminate any remaining child processes from other instances
ProcessUtils.TerminateProcesses(x => x.IsUnderDirectory(_session.WorkingDir), _logger, stoppingToken);
// Track how many updates we get in 10 seconds. We'll start rate limiting this if it looks like we've got a problem that's causing us to spam the server.
Stopwatch updateTimer = Stopwatch.StartNew();
Queue updateTimes = new Queue();
// Run a background task to update the capabilities of this agent
await using BackgroundTask updateCapsTask = BackgroundTask.StartNew(ctx => UpdateCapabilitiesBackgroundAsync(_session.WorkingDir, ctx));
// Run another background task to send telemetry data
await using BackgroundTask telemetryTask = BackgroundTask.StartNew(ctx => SendTelemetryAsync(hordeRpc, ctx));
// Loop until we're ready to exit
Stopwatch updateCapabilitiesTimer = Stopwatch.StartNew();
for (; ; )
{
Task waitTask = Task.WhenAny(_updateLeasesEvent.Task, _statusService.StatusChangedEvent.Task);
// Flag for whether the service is stopping
if (stoppingToken.IsCancellationRequested && _sessionResult == null)
{
_logger.LogInformation("Cancellation from token requested; setting session result to terminate.");
_sessionResult = new SessionResult(SessionOutcome.Terminate, SessionReason.Cancelled);
}
bool stopping = false;
if (_sessionResult != null)
{
_logger.LogInformation("Session termination requested (result: {Result})", _sessionResult.Outcome);
stopping = true;
}
// Build the next update request
RpcUpdateSessionRequest updateSessionRequest = new RpcUpdateSessionRequest();
updateSessionRequest.AgentId = _session.AgentId.ToString();
updateSessionRequest.SessionId = _session.SessionId.ToString();
// Get the new the lease states. If a restart is requested and we have no active leases, signal to the server that we're stopping.
lock (_lockObject)
{
foreach (LeaseHandler activeLease in _activeLeases)
{
updateSessionRequest.Leases.Add(new RpcLease(activeLease.RpcLease));
}
if (_sessionResult != null && _activeLeases.Count == 0)
{
stopping = true;
}
}
// Get the new agent status to be reported back to server
if (stopping)
{
// If stopping, ensure capabilities are updated one last time
_capabilities = await _capabilitiesService.GetCapabilitiesAsync(_session.WorkingDir);
updateSessionRequest.Status = RpcAgentStatus.Stopping;
}
else if (_unhealthy)
{
updateSessionRequest.Status = RpcAgentStatus.Unhealthy;
}
else if (_statusService.IsBusy)
{
updateSessionRequest.Status = RpcAgentStatus.Busy;
}
else
{
updateSessionRequest.Status = RpcAgentStatus.Ok;
}
// Update the capabilities whenever the background task has generated a new instance
updateSessionRequest.Capabilities = Interlocked.Exchange(ref _capabilities, null);
// Complete the wait task if we subsequently stop
using (stopping ? (CancellationTokenRegistration?)null : stoppingToken.Register(() => _updateLeasesEvent.Set()))
{
// Update the state with the server
RpcUpdateSessionResponse? updateSessionResponse = await UpdateSessionAsync(hordeRpc, updateSessionRequest, waitTask);
lock (_lockObject)
{
// Now reconcile the local state to match what the server reports
if (updateSessionResponse != null)
{
if (TerminateSessionAfterLease)
{
bool atLeastOneLeaseFinished = _activeLeases.Any(x => x.RpcLease.State is RpcLeaseState.Completed or RpcLeaseState.Cancelled);
if (atLeastOneLeaseFinished || _activeLeases.Count == 0)
{
_logger.LogInformation("Session termination requested. At least one lease executed or no lease is active, proceeding...");
_sessionResult = new SessionResult(SessionOutcome.Terminate, SessionReason.Completed);
}
}
PoolIds = updateSessionResponse.PoolIds;
// Remove any leases which have completed
int numRemoved = _activeLeases.RemoveAll(x => (x.RpcLease.State == RpcLeaseState.Completed || x.RpcLease.State == RpcLeaseState.Cancelled) && !updateSessionResponse.Leases.Any(y => y.Id == x.RpcLease.Id && y.State != RpcLeaseState.Cancelled));
NumLeasesCompleted += numRemoved;
// Create any new leases and cancel any running leases
foreach (RpcLease serverLease in updateSessionResponse.Leases)
{
if (serverLease.State == RpcLeaseState.Cancelled)
{
LeaseHandler? handler = _activeLeases.FirstOrDefault(x => x.RpcLease.Id == serverLease.Id);
if (handler != null)
{
_logger.LogInformation("Cancelling lease {LeaseId}", serverLease.Id);
handler.Cancel("cancelled by server");
}
}
if (serverLease.State == RpcLeaseState.Pending && !_activeLeases.Any(x => x.RpcLease.Id == serverLease.Id))
{
serverLease.State = RpcLeaseState.Active;
_logger.LogInformation("Adding lease {LeaseId}", serverLease.Id);
LeaseHandler leaseHandler = CreateLeaseHandler(serverLease);
leaseHandler.Start(_session, _tracer, _logger, _leaseLoggerFactory);
leaseHandler.Result.ContinueWith((Task task) =>
{
LeaseResult result = task.Result;
if (result.SessionResult != null && _sessionResult == null)
{
_logger.LogInformation("Lease {LeaseId} is setting session result to {Result}", leaseHandler.Id, result.SessionResult.Outcome);
_sessionResult = result.SessionResult;
}
OnLeaseFinished?.Invoke(serverLease, task.Result);
_updateLeasesEvent.Set();
}, TaskScheduler.Default);
_activeLeases.Add(leaseHandler);
OnLeaseStarted?.Invoke(serverLease);
}
}
// Update the session result if we've transitioned to stopped
if (updateSessionResponse.Status == RpcAgentStatus.Stopped)
{
SessionResult result = _sessionResult ?? new SessionResult(SessionOutcome.BackOff, SessionReason.Completed);
_logger.LogInformation("Agent status is stopped; returning from session update loop with result {Result}", result);
return result;
}
}
}
// Update the current status
if (!_session.HordeClient.HasValidAccessToken())
{
_statusService.Set(false, _activeLeases.Count, "Attempting to connect to server...");
}
else if (_statusService.IsBusy)
{
_statusService.Set(true, 0, "Paused");
if (_activeLeases.Count > 0)
{
_logger.LogInformation("Agent marked itself as busy. Draining any active leases to prevent them from using up local resources...");
await DrainLeasesAsync("user is active", true);
}
}
else if (_activeLeases.Count == 0)
{
_statusService.Set(true, 0, "Waiting for work");
}
else
{
_statusService.Set(true, _activeLeases.Count, $"Executing {_activeLeases.Count} lease(s)");
}
}
// Update the historical update times
TimeSpan updateTime = updateTimer.Elapsed;
while (updateTimes.TryPeek(out TimeSpan firstTime) && firstTime < updateTime - TimeSpan.FromMinutes(1.0))
{
updateTimes.Dequeue();
}
updateTimes.Enqueue(updateTime);
// If we're updating too much, introduce an artificial delay
if (updateTimes.Count > 60)
{
_logger.LogWarning("Agent is issuing large number of UpdateSession() calls. Delaying for 10 seconds.");
await Task.Delay(TimeSpan.FromSeconds(10.0), stoppingToken);
}
}
}
LeaseHandler CreateLeaseHandler(RpcLease lease)
{
Any payload = lease.Payload;
if (!_typeUrlToLeaseHandler.TryGetValue(payload.TypeUrl, out LeaseHandlerFactory? leaseHandlerFactory))
{
_logger.LogError("Invalid lease payload type ({PayloadType})", payload.TypeUrl);
return new DefaultLeaseHandler(lease, LeaseResult.Failed);
}
return leaseHandlerFactory.CreateHandler(lease);
}
///
/// Wrapper for which filters/logs exceptions
///
/// The RPC client connection
/// The session update request
/// Task which can be used to jump out of the update early
/// Response from the call
async Task UpdateSessionAsync(HordeRpc.HordeRpcClient hordeRpc, RpcUpdateSessionRequest updateSessionRequest, Task waitTask)
{
const int MaxRetries = 8;
const int BaseRetryDelayMs = 1000;
const int MaxDelayMs = 20000;
RpcUpdateSessionResponse? updateSessionResponse = null;
try
{
updateSessionResponse = await UpdateSessionInternalAsync(hordeRpc, updateSessionRequest, waitTask);
_updateSessionFailures = 0;
}
catch (Exception ex) when (ex is RpcException or HttpRequestException)
{
_updateSessionFailures++;
if (_updateSessionFailures >= MaxRetries)
{
throw;
}
int retryDelayMs = Math.Min((int)(BaseRetryDelayMs * Math.Pow(2, _updateSessionFailures - 1)), MaxDelayMs);
_logger.LogWarning(ex, "Error while updating session. Backing off {DelayMs} ms before next attempt... ({Attempt} of {MaxAttempts})", retryDelayMs, _updateSessionFailures, MaxRetries);
await Task.Delay(retryDelayMs);
}
return updateSessionResponse;
}
///
/// Tries to update the session state on the server.
///
/// This operation is a little gnarly due to the fact that we want to long-poll for the result.
/// Since we're doing the update via a gRPC call, the way to do that without using cancellation tokens is to keep the request stream open
/// until we want to terminate the call (see https://github.com/grpc/grpc/issues/8277). In order to do that, we need to make a
/// bidirectional streaming call, even though we only expect one response/response.
///
/// The RPC client
/// The session update request
/// Task to use to terminate the wait
/// The response object
async Task UpdateSessionInternalAsync(HordeRpc.HordeRpcClient rpcClient, RpcUpdateSessionRequest request, Task waitTask)
{
DateTime deadline = DateTime.UtcNow + TimeSpan.FromMinutes(2.0);
using AsyncDuplexStreamingCall call = rpcClient.UpdateSession(deadline: deadline);
_logger.LogDebug("Updating session {SessionId} (Status={Status})", request.SessionId, request.Status);
// Write the request to the server
await call.RequestStream.WriteAsync(request);
// Wait until the server responds or we need to trigger a new update
Task moveNextAsync = call.ResponseStream.MoveNext();
Task task = await Task.WhenAny(moveNextAsync, waitTask);
if (task == waitTask)
{
_logger.LogDebug("Cancelling long poll from client side (new update)");
}
// Close the request stream to indicate that we're finished
await call.RequestStream.CompleteAsync();
// Wait for a response or a new update to come in, then close the request stream
RpcUpdateSessionResponse? response = null;
while (await moveNextAsync)
{
response = call.ResponseStream.Current;
moveNextAsync = call.ResponseStream.MoveNext();
}
return response;
}
///
/// Background task that updates the capabilities of this agent
///
async Task UpdateCapabilitiesBackgroundAsync(DirectoryReference workingDir, CancellationToken cancellationToken)
{
while (!cancellationToken.IsCancellationRequested)
{
await Task.Delay(TimeSpan.FromMinutes(5.0), cancellationToken);
try
{
Interlocked.Exchange(ref _capabilities, await _capabilitiesService.GetCapabilitiesAsync(workingDir));
}
catch (Exception ex)
{
_logger.LogWarning(ex, "Unable to query agent capabilities. Ignoring.");
}
}
}
///
/// Periodically send agent telemetry to the server
///
async Task SendTelemetryAsync(HordeRpc.HordeRpcClient hordeRpc, CancellationToken cancellationToken)
{
while (!cancellationToken.IsCancellationRequested)
{
try
{
CpuMetrics? cpuMetrics = _systemMetrics.GetCpu();
MemoryMetrics? memMetrics = _systemMetrics.GetMemory();
DiskMetrics? diskMetrics = _systemMetrics.GetDisk();
if (cpuMetrics != null || memMetrics != null || diskMetrics != null)
{
RpcUploadTelemetryRequest request = new RpcUploadTelemetryRequest();
request.AgentId = _session.AgentId.ToString();
if (cpuMetrics != null)
{
request.UserCpu = cpuMetrics.User;
request.SystemCpu = cpuMetrics.System;
request.IdleCpu = cpuMetrics.Idle;
}
if (memMetrics != null)
{
request.TotalRam = memMetrics.Total / 1024;
request.FreeRam = memMetrics.Available / 1024;
request.UsedRam = memMetrics.Used / 1024;
}
if (diskMetrics != null)
{
request.FreeDisk = (ulong)(diskMetrics.FreeSpace / (1024 * 1024));
request.TotalDisk = (ulong)(diskMetrics.TotalSize / (1024 * 1024));
}
await hordeRpc.UploadTelemetryAsync(request, cancellationToken: cancellationToken);
}
}
catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested)
{
break;
}
catch (Exception ex)
{
_logger.LogWarning(ex, "Exception sending telemetry data: {Message}", ex.Message);
}
await Task.Delay(TimeSpan.FromSeconds(30.0), cancellationToken);
}
}
}
}