2263 lines
85 KiB
C#
2263 lines
85 KiB
C#
// Copyright Epic Games, Inc. All Rights Reserved.
|
|
|
|
|
|
using System;
|
|
using System.Collections;
|
|
using System.Collections.Generic;
|
|
using System.Data.SqlClient;
|
|
using System.Diagnostics;
|
|
using System.IO;
|
|
using System.Runtime.InteropServices;
|
|
using System.Security.Cryptography.X509Certificates;
|
|
using System.Threading;
|
|
|
|
using AgentInterface;
|
|
using SwarmCoordinatorInterface;
|
|
|
|
namespace Agent
|
|
{
|
|
///////////////////////////////////////////////////////////////////////////
|
|
|
|
public class AgentTask
|
|
{
|
|
public AgentTask( AgentTaskSpecification NewSpecification )
|
|
{
|
|
Specification = NewSpecification;
|
|
CurrentState = new AgentTaskState( NewSpecification.JobGuid, NewSpecification.TaskGuid, EJobTaskState.TASK_STATE_IDLE );
|
|
CurrentOwner = null;
|
|
}
|
|
|
|
public static Int32 CompareTasksByCostDescending(AgentTask A, AgentTask B)
|
|
{
|
|
if( ( A.Specification != null ) &&
|
|
( B.Specification != null ) )
|
|
{
|
|
return -A.Specification.Cost.CompareTo( B.Specification.Cost );
|
|
}
|
|
return 0;
|
|
}
|
|
|
|
public static Int32 CompareTasksByAssignTime( AgentTask A, AgentTask B )
|
|
{
|
|
return A.AssignTime.CompareTo( B.AssignTime );
|
|
}
|
|
|
|
public AgentTaskSpecification Specification;
|
|
public AgentTaskState CurrentState;
|
|
public Connection CurrentOwner;
|
|
public DateTime AssignTime;
|
|
public DateTime StartTime;
|
|
public DateTime StopTime;
|
|
}
|
|
|
|
public class AgentJob
|
|
{
|
|
public enum JobState
|
|
{
|
|
AGENT_JOB_UNSPECIFIED, // Jobs start in this state
|
|
AGENT_JOB_PENDING, // Once BeginJobSpecification is called
|
|
AGENT_JOB_RUNNING, // Once EndJobSpecification is called
|
|
AGENT_JOB_CLOSED, // Once CloseJob is called
|
|
}
|
|
|
|
public enum JobSuccessState
|
|
{
|
|
AGENT_JOB_INCOMPLETE, // Jobs start in this state
|
|
AGENT_JOB_SUCCESS, // Once CloseJob is called, if the Job was a success
|
|
AGENT_JOB_FAILURE, // Once CloseJob is called, if the Job was a failure
|
|
}
|
|
|
|
public AgentJob( Agent NewManager )
|
|
{
|
|
// Everything else is already initialized to defaults
|
|
Manager = NewManager;
|
|
}
|
|
|
|
public Int32 OpenJob( Connection NewOwner, AgentGuid NewJobGuid )
|
|
{
|
|
// Create the bi-directional link between the job and its owner
|
|
NewOwner.Job = this;
|
|
Owner = NewOwner;
|
|
|
|
// Determine if the owner is the Instigator, and if so, perform
|
|
// any additional work necessary
|
|
if( NewOwner is LocalConnection )
|
|
{
|
|
OwnerIsInstigator = true;
|
|
|
|
// If the owner of the job is the Instigator, set the current working
|
|
// directory to be where the Instigator process is executing
|
|
LocalConnection LocalNewOwner = NewOwner as LocalConnection;
|
|
Manager.SetCurrentDirectoryByProcessID( LocalNewOwner.ProcessID );
|
|
|
|
// Update the visualizer
|
|
AgentApplication.UpdateMachineState(System.Net.Dns.GetHostName(), -1, EProgressionState.InstigatorConnected );
|
|
}
|
|
|
|
// Add to the Agent-wide list of active jobs
|
|
JobGuid = NewJobGuid;
|
|
Manager.ActiveJobs.Add( NewJobGuid, this );
|
|
|
|
return Constants.SUCCESS;
|
|
}
|
|
|
|
private bool RequestDependency(Connection RequestingConnection, string Dependency, bool bIsRequired)
|
|
{
|
|
Hashtable InParameters = new Hashtable();
|
|
InParameters["Version"] = ESwarmVersionValue.VER_1_0;
|
|
InParameters["ChannelName"] = Dependency;
|
|
Hashtable OutParameters = null;
|
|
if (Manager.TestChannel(RequestingConnection.Handle, InParameters, ref OutParameters) < 0)
|
|
{
|
|
if (RequestingConnection is RemoteConnection)
|
|
{
|
|
// For remote connection misses, try to pull the file from the instigator
|
|
Manager.Log(EVerbosityLevel.Verbose, ELogColour.Green, "[Job] Attempting to pull file from remote Agent: " + Dependency);
|
|
if (Manager.PullChannel(RequestingConnection as RemoteConnection, Dependency, null, 5) == false)
|
|
{
|
|
if (bIsRequired)
|
|
{
|
|
Manager.Log(EVerbosityLevel.Informative, ELogColour.Red, "[Job] Error: Failed to pull a required file from remote Agent: " + Dependency);
|
|
return false;
|
|
}
|
|
else
|
|
{
|
|
Manager.Log(EVerbosityLevel.Verbose, ELogColour.Orange, "[Job] Warning: Failed to pull an optional file from remote Agent: " + Dependency);
|
|
}
|
|
}
|
|
}
|
|
else
|
|
{
|
|
Debug.Assert(RequestingConnection is LocalConnection);
|
|
|
|
if (bIsRequired)
|
|
{
|
|
// Always fail on local connection misses
|
|
Manager.Log(EVerbosityLevel.Informative, ELogColour.Red, "[Job] Error: Failed to find required file: " + Dependency);
|
|
return false;
|
|
}
|
|
else
|
|
{
|
|
Manager.Log(EVerbosityLevel.Verbose, ELogColour.Orange, "[Job] Warning: Failed to find an optional file: " + Dependency);
|
|
}
|
|
}
|
|
}
|
|
return true;
|
|
}
|
|
|
|
private bool RequestJobFiles( Connection RequestingConnection, AgentJobSpecification JobSpecification )
|
|
{
|
|
// Check for and possibly request the executable
|
|
if( !RequestDependency( RequestingConnection, JobSpecification.ExecutableName, true ) )
|
|
{
|
|
return false;
|
|
}
|
|
// Check for and possibly request each dependency
|
|
if( JobSpecification.RequiredDependencies != null )
|
|
{
|
|
foreach( string Dependency in JobSpecification.RequiredDependencies )
|
|
{
|
|
if( !RequestDependency( RequestingConnection, Dependency, true ) )
|
|
{
|
|
return false;
|
|
}
|
|
}
|
|
}
|
|
if( JobSpecification.OptionalDependencies != null )
|
|
{
|
|
foreach( string Dependency in JobSpecification.OptionalDependencies )
|
|
{
|
|
if( !RequestDependency( RequestingConnection, Dependency, false ) )
|
|
{
|
|
return false;
|
|
}
|
|
}
|
|
}
|
|
return true;
|
|
}
|
|
|
|
#if !__MonoCS__
|
|
[DllImport("kernel32.dll", SetLastError = true, CallingConvention = CallingConvention.Winapi)]
|
|
[return: MarshalAs(UnmanagedType.Bool)]
|
|
public static extern bool IsWow64Process([In] IntPtr hProcess, [Out] out bool lpSystemInfo);
|
|
#endif
|
|
|
|
public bool Allow64bitJobs()
|
|
{
|
|
#if !__MonoCS__
|
|
bool bIsUsing64bitOperatingSystem = false;
|
|
|
|
// 64-bit process running in 64-bit Windows
|
|
if ( IntPtr.Size == 8 )
|
|
{
|
|
bIsUsing64bitOperatingSystem = true;
|
|
}
|
|
// 32-bit process
|
|
else if ( IntPtr.Size == 4 )
|
|
{
|
|
// Check if we're using WOW64 to run on 64-bit Windows.
|
|
if ( IsWow64Process(Process.GetCurrentProcess().Handle, out bIsUsing64bitOperatingSystem) == false )
|
|
{
|
|
// Got some error, assume we're not running on 64-bit Windows.
|
|
bIsUsing64bitOperatingSystem = false;
|
|
}
|
|
}
|
|
return bIsUsing64bitOperatingSystem;
|
|
#else
|
|
return true;
|
|
#endif
|
|
}
|
|
|
|
public Int32 BeginJobSpecification( AgentJobSpecification NewJobSpecification32, Hashtable NewJobDescription32, AgentJobSpecification NewJobSpecification64, Hashtable NewJobDescription64 )
|
|
{
|
|
Int32 ErrorCode = Constants.INVALID;
|
|
|
|
bool bAllow64bitJobs = Allow64bitJobs();
|
|
bool bUse64bitJob = (bAllow64bitJobs && NewJobSpecification64 != null) ? true : false;
|
|
|
|
// Using the provided specifications, request all necessary Job files. For local
|
|
// Agents, the executable and dependencies must already exist in the cache. For
|
|
// remote Agents, we'll need request the necessary files from the Instigator.
|
|
bool bReceivedJobFiles = false;
|
|
|
|
// Am I the instigator?
|
|
if ( Owner is LocalConnection )
|
|
{
|
|
// Request the files for both job specifications (so we can hand out either version to remote agents).
|
|
if ( NewJobSpecification32 != null )
|
|
{
|
|
bReceivedJobFiles = RequestJobFiles( Owner, NewJobSpecification32 );
|
|
}
|
|
if ( NewJobSpecification64 != null && (NewJobSpecification32 == null || bReceivedJobFiles) )
|
|
{
|
|
bReceivedJobFiles = RequestJobFiles( Owner, NewJobSpecification64 );
|
|
}
|
|
}
|
|
else
|
|
{
|
|
// Request the files for my job specification only.
|
|
bReceivedJobFiles = RequestJobFiles( Owner, bUse64bitJob ? NewJobSpecification64 : NewJobSpecification32 );
|
|
}
|
|
|
|
if ( bReceivedJobFiles )
|
|
{
|
|
// If this succeeds, set the new specification and update the job state
|
|
Specification32 = NewJobSpecification32;
|
|
Specification64 = NewJobSpecification64;
|
|
Description32 = NewJobDescription32;
|
|
Description64 = NewJobDescription64;
|
|
|
|
Specification = bUse64bitJob ? NewJobSpecification64 : NewJobSpecification32;
|
|
Description = bUse64bitJob ? NewJobDescription64 : NewJobDescription32;
|
|
|
|
CurrentState = JobState.AGENT_JOB_PENDING;
|
|
|
|
ErrorCode = Constants.SUCCESS;
|
|
}
|
|
else
|
|
{
|
|
Manager.Log( EVerbosityLevel.Critical, ELogColour.Red, "[Job] BeginJobSpecification: Failed to cache necessary job files" );
|
|
ErrorCode = Constants.ERROR_CHANNEL_NOT_FOUND;
|
|
}
|
|
|
|
return ( ErrorCode );
|
|
}
|
|
|
|
private bool RequestTaskFiles( Connection RequestingConnection, AgentTaskSpecification TaskSpecification )
|
|
{
|
|
// Check for and possibly request each dependency
|
|
if( TaskSpecification.Dependencies != null )
|
|
{
|
|
foreach( string Dependency in TaskSpecification.Dependencies )
|
|
{
|
|
if( !RequestDependency( RequestingConnection, Dependency, true ) )
|
|
{
|
|
return false;
|
|
}
|
|
}
|
|
}
|
|
return true;
|
|
}
|
|
|
|
public Int32 AddTask( AgentTaskSpecification NewTaskSpecification )
|
|
{
|
|
Int32 ErrorCode = Constants.INVALID;
|
|
|
|
// Using the provided specification, request all necessary Task files. For local
|
|
// Agents, the executable and dependencies must already exist in the cache. For
|
|
// remote Agents, we'll need request the necessary files from the Instigator.
|
|
if( RequestTaskFiles( Owner, NewTaskSpecification ) )
|
|
{
|
|
lock( PendingTasks )
|
|
{
|
|
Manager.Log( EVerbosityLevel.Verbose, ELogColour.Green, "[Job] AddTask: Adding \"" + NewTaskSpecification.Parameters + "\" (" + NewTaskSpecification.TaskGuid.ToString() + ") with cost " + NewTaskSpecification.Cost.ToString() );
|
|
|
|
// Queue the Task
|
|
PendingTasks.Push( new AgentTask( NewTaskSpecification ) );
|
|
TaskCount++;
|
|
|
|
// Sanity check a couple assumptions here
|
|
if( PendingTasks.Count > 1 )
|
|
{
|
|
Debug.Assert( TaskReservationCount == 0 );
|
|
}
|
|
if( TaskReservationCount > 0 )
|
|
{
|
|
Debug.Assert( PendingTasks.Count == 1 );
|
|
}
|
|
}
|
|
|
|
// After adding a new Task, check for any outstanding reservations
|
|
CheckForReservations();
|
|
|
|
// Done
|
|
ErrorCode = Constants.SUCCESS;
|
|
}
|
|
else
|
|
{
|
|
Manager.Log( EVerbosityLevel.Critical, ELogColour.Red, "[Job] AddTask: Failed to cache necessary task files" );
|
|
ErrorCode = Constants.ERROR_CHANNEL_NOT_FOUND;
|
|
}
|
|
|
|
return ( ErrorCode );
|
|
}
|
|
|
|
private void SendJobCompletedMessage( AgentInfoMessage AdditionalInfoMessage )
|
|
{
|
|
// Only send a message back if we're the Instigator - remote workers don't have
|
|
// enough information to make this determination
|
|
Debug.Assert( OwnerIsInstigator );
|
|
|
|
AgentJobState UpdatedStateMessage = null;
|
|
if( CurrentSuccessState == JobSuccessState.AGENT_JOB_SUCCESS )
|
|
{
|
|
Manager.Log( EVerbosityLevel.Informative, ELogColour.Green, "[Job] " + AdditionalInfoMessage.TextMessage );
|
|
UpdatedStateMessage = new AgentJobState( JobGuid, EJobTaskState.STATE_COMPLETE_SUCCESS );
|
|
}
|
|
else if( CurrentSuccessState == JobSuccessState.AGENT_JOB_FAILURE )
|
|
{
|
|
Manager.Log( EVerbosityLevel.Informative, ELogColour.Red, "[Job] " + AdditionalInfoMessage.TextMessage );
|
|
UpdatedStateMessage = new AgentJobState( JobGuid, EJobTaskState.STATE_COMPLETE_FAILURE );
|
|
}
|
|
|
|
// Only if we have an actual update should we send one
|
|
if( UpdatedStateMessage != null )
|
|
{
|
|
// Set the running time
|
|
TimeSpan JobRunningTime = DateTime.UtcNow - StartTime;
|
|
UpdatedStateMessage.JobRunningTime = JobRunningTime.TotalSeconds;
|
|
|
|
// Set the exit code, if there is one
|
|
UpdatedStateMessage.JobExitCode = ProcessObjectExitCode;
|
|
|
|
// Send the actual message and the additional state message
|
|
if( AdditionalInfoMessage != null )
|
|
{
|
|
Manager.SendMessageInternal( Owner, AdditionalInfoMessage );
|
|
}
|
|
Manager.SendMessageInternal( Owner, UpdatedStateMessage );
|
|
}
|
|
}
|
|
|
|
public void OutputReceivedDataEventHandler( Object Sender, DataReceivedEventArgs Line )
|
|
{
|
|
if( ( Line != null ) &&
|
|
( Line.Data != null ) )
|
|
{
|
|
if( ProcessObjectOutputLines < AgentApplication.DeveloperOptions.MaximumJobApplicationLogLines )
|
|
{
|
|
Manager.Log( EVerbosityLevel.Informative, ELogColour.Blue, Line.Data );
|
|
}
|
|
else
|
|
{
|
|
if( ProcessObjectOutputLines == AgentApplication.DeveloperOptions.MaximumJobApplicationLogLines )
|
|
{
|
|
Manager.Log( EVerbosityLevel.Informative, ELogColour.Blue, "Maximum output from job application received (defined in the Agent Settings)" );
|
|
Manager.Log( EVerbosityLevel.Informative, ELogColour.Blue, "Remaining output from job application truncated - see the job log for complete output" );
|
|
}
|
|
|
|
// Send it along anyway, but mark it as the highest level of verbosity
|
|
Manager.Log( EVerbosityLevel.ExtraVerbose, ELogColour.Blue, Line.Data );
|
|
}
|
|
ProcessObjectOutputLines++;
|
|
}
|
|
}
|
|
|
|
public void ErrorReceivedDataEventHandler( Object Sender, DataReceivedEventArgs Line )
|
|
{
|
|
if( ( Line != null ) &&
|
|
( Line.Data != null ) )
|
|
{
|
|
if( ProcessObjectOutputLines < AgentApplication.DeveloperOptions.MaximumJobApplicationLogLines )
|
|
{
|
|
Manager.Log( EVerbosityLevel.Informative, ELogColour.Red, Line.Data );
|
|
}
|
|
else
|
|
{
|
|
if( ProcessObjectOutputLines == AgentApplication.DeveloperOptions.MaximumJobApplicationLogLines )
|
|
{
|
|
Manager.Log( EVerbosityLevel.Informative, ELogColour.Blue, "Maximum output from job application received (defined in the Agent Settings)" );
|
|
Manager.Log( EVerbosityLevel.Informative, ELogColour.Blue, "Remaining output from job application truncated - see the job log for complete output" );
|
|
}
|
|
|
|
// Send it along anyway, but mark it as the highest level of verbosity
|
|
Manager.Log( EVerbosityLevel.ExtraVerbose, ELogColour.Red, Line.Data );
|
|
}
|
|
ProcessObjectOutputLines++;
|
|
}
|
|
}
|
|
|
|
private Object PostJobStatsToDBLock = new Object();
|
|
private void PostJobStatsToDB()
|
|
{
|
|
try
|
|
{
|
|
bool ShouldUpdateDB = false;
|
|
lock( CurrentStateLock )
|
|
{
|
|
// If the job is closed and the process has exited, try to update the DB
|
|
if( ( CurrentState == JobState.AGENT_JOB_CLOSED ) &&
|
|
( ProcessObject.HasExited == true ) )
|
|
{
|
|
ShouldUpdateDB = true;
|
|
}
|
|
}
|
|
|
|
lock( PostJobStatsToDBLock )
|
|
{
|
|
// Only try to connect once and only if DB name is not empty
|
|
if( ( ShouldUpdateDB ) &&
|
|
( JobStatsPostedToDB == false ) &&
|
|
( Properties.Settings.Default.StatsDatabaseName.Length > 0 ) )
|
|
{
|
|
// Build query string using build in Windows Authentication and lowers the connection timeout to 3 seconds
|
|
string ConnectionString = String.Format( "Data Source={0};Initial Catalog=SwarmStats;Trusted_Connection=Yes;Connection Timeout=3;", Properties.Settings.Default.StatsDatabaseName );
|
|
SqlConnection PerfDBConnection = new SqlConnection( ConnectionString );
|
|
|
|
// Open connection to DB
|
|
PerfDBConnection.Open();
|
|
|
|
// WARNING: DO NOT MODIFY THE BELOW WITHOUT UPDATING THE DB, BUMPING THE VERSION AND REGENERATE THE CREATE SCRIPTS
|
|
|
|
// Create command string for stored procedure
|
|
string SqlCommandString = "EXEC dbo.AddJob_v1";
|
|
SqlCommandString += String.Format( " @Duration='{0}',", ( StopTime - StartTime ).TotalSeconds );
|
|
SqlCommandString += String.Format( " @UserName='{0}',", Environment.UserName.ToUpperInvariant() );
|
|
SqlCommandString += String.Format( " @MachineName='{0}',", System.Net.Dns.GetHostName().ToUpperInvariant() );
|
|
SqlCommandString += String.Format( " @GroupName='{0}',", AgentApplication.Options.AgentGroupName );
|
|
SqlCommandString += String.Format( " @JobGUID='{0}',", JobGuid.ToString() );
|
|
SqlCommandString += String.Format( " @Instigator='{0}',", OwnerIsInstigator );
|
|
SqlCommandString += String.Format( " @DistributionEnabled='{0}',", !AgentApplication.Options.EnableStandaloneMode );
|
|
SqlCommandString += String.Format( " @RemoteMachineCount='{0}',", Owner.RemoteChildrenSeen );
|
|
SqlCommandString += String.Format( " @ExecutableName='{0}',", Specification.ExecutableName );
|
|
SqlCommandString += String.Format( " @Parameters='{0}',", Specification.Parameters );
|
|
SqlCommandString += String.Format( " @JobWasSuccess='{0}',", ( CurrentSuccessState == JobSuccessState.AGENT_JOB_SUCCESS ) );
|
|
SqlCommandString += String.Format( " @TotalTaskCount='{0}',", TaskCount );
|
|
SqlCommandString += String.Format( " @DistributedTaskCount='{0}',", TaskCountRemote );
|
|
SqlCommandString += String.Format( " @RestartedTaskCount='{0}',", TaskCountRequeue );
|
|
SqlCommandString += String.Format( " @CacheHitRate='{0}',", ( CacheRequests > 0 ) ? ( ( ( float )CacheHits / ( float )CacheRequests ) * 100.0f ) : 0.0f );
|
|
SqlCommandString += String.Format( " @NetworkBytesSent='{0}',", NetworkBytesSent );
|
|
SqlCommandString += String.Format( " @NetworkBytesReceived='{0}',", NetworkBytesReceived );
|
|
SqlCommandString += String.Format( " @PeakVirtualMemoryUsed='{0}',", ProcessObjectPeakVirtualMemorySize64 );
|
|
SqlCommandString += String.Format( " @PeakPhysicalMemoryUsed='{0}',", ProcessObjectPeakWorkingSet64 );
|
|
SqlCommandString += String.Format( " @TotalProcessorTime='{0}',", ProcessObject.TotalProcessorTime.TotalSeconds );
|
|
SqlCommandString += String.Format( " @ExitCode='{0}',", ProcessObjectExitCode );
|
|
SqlCommandString += String.Format( " @QualityName='{0}',", ( Description.Contains( "QualityLevel" ) ? Description["QualityLevel"] as string : "" ) );
|
|
SqlCommandString += String.Format( " @GameName='{0}',", ( Description.Contains( "GameName" ) ? Description["GameName"] as string : "" ) );
|
|
SqlCommandString += String.Format( " @MapName='{0}'", ( Description.Contains( "MapName" ) ? Description["MapName"] as string : "" ) );
|
|
|
|
// Execute stored procedure adding build summary information
|
|
Manager.Log( EVerbosityLevel.Verbose, ELogColour.Green, "[PostJobStatsToDB] Posting stats to DB" );
|
|
Manager.Log( EVerbosityLevel.Verbose, ELogColour.Green, "[PostJobStatsToDB] " + SqlCommandString );
|
|
SqlCommand SendSummaryCommand = new SqlCommand( SqlCommandString, PerfDBConnection );
|
|
SendSummaryCommand.ExecuteNonQuery();
|
|
|
|
// We're done, close the connection
|
|
PerfDBConnection.Close();
|
|
|
|
JobStatsPostedToDB = true;
|
|
}
|
|
}
|
|
}
|
|
// Catch exceptions here instead of at higher level as we don't care if DB connection fails.
|
|
catch( Exception Ex )
|
|
{
|
|
Manager.Log( EVerbosityLevel.Verbose, ELogColour.Orange, "[PostJobStatsToDB] Database error:" );
|
|
Manager.Log( EVerbosityLevel.Verbose, ELogColour.Orange, "[PostJobStatsToDB] " + Ex.Message );
|
|
}
|
|
}
|
|
|
|
public void ExitedProcessEventHandler( Object Sender, EventArgs Args )
|
|
{
|
|
// Verify that the process is the one we think it is and update the Job state
|
|
if( ProcessObject == ( Sender as Process ) )
|
|
{
|
|
Debug.Assert( ProcessObject.HasExited );
|
|
|
|
// Grab any additional data from the ProcessObject before we let it go
|
|
ProcessObjectExitCode = ProcessObject.ExitCode;
|
|
|
|
lock( CurrentSuccessStateLock )
|
|
{
|
|
// Only update if the job state has not been determined by other means
|
|
if( CurrentSuccessState == JobSuccessState.AGENT_JOB_INCOMPLETE )
|
|
{
|
|
// Determine if this is an agent managed process
|
|
bool IsAnAgentManagedProcess =
|
|
( Specification.JobFlags & EJobTaskFlags.FLAG_MANUAL_START ) == 0;
|
|
|
|
// If this is an agent managed, Task-based process
|
|
if( ( IsAnAgentManagedProcess ) &&
|
|
( TaskCount > 0 ) )
|
|
{
|
|
// If the Job executable didn't close cleanly it's marked a failure,
|
|
// otherwise, we'll wait until CloseJob is called to determine if
|
|
// it's a success
|
|
if( ProcessObject.ExitCode != 0 )
|
|
{
|
|
CurrentSuccessState = JobSuccessState.AGENT_JOB_FAILURE;
|
|
if( OwnerIsInstigator )
|
|
{
|
|
// Log and send an INFO message describing the failure
|
|
string NewMessageText = "Job has failed! Job executable didn't exit cleanly. Exit code: " + ProcessObject.ExitCode.ToString();
|
|
SendJobCompletedMessage( new AgentInfoMessage( NewMessageText ) );
|
|
}
|
|
}
|
|
}
|
|
else
|
|
{
|
|
// Otherwise, this is an Agent managed, non-Task-based Job, or this Job
|
|
// is manually managed outside of the Agent. In either case, the exit
|
|
// code will determine success.
|
|
if( ProcessObject.ExitCode == 0 )
|
|
{
|
|
CurrentSuccessState = JobSuccessState.AGENT_JOB_SUCCESS;
|
|
if( OwnerIsInstigator )
|
|
{
|
|
// Log and send an INFO message describing the success
|
|
string NewMessageText = "Job is a success!";
|
|
SendJobCompletedMessage( new AgentInfoMessage( NewMessageText ) );
|
|
}
|
|
}
|
|
else
|
|
{
|
|
CurrentSuccessState = JobSuccessState.AGENT_JOB_FAILURE;
|
|
if( OwnerIsInstigator )
|
|
{
|
|
// Log and send an INFO message describing the failure
|
|
string NewMessageText = "Job has failed! Job executable has exited with a non-zero exit code";
|
|
SendJobCompletedMessage( new AgentInfoMessage( NewMessageText ) );
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// Attempt to report the final stats to the DB
|
|
PostJobStatsToDB();
|
|
}
|
|
}
|
|
|
|
private Int32 StartJobExecution()
|
|
{
|
|
Int32 ErrorCode = Constants.INVALID;
|
|
try
|
|
{
|
|
string JobsFolder = Path.Combine( AgentApplication.Options.CacheFolder, "Jobs" );
|
|
string JobSpecificFolder = Path.Combine( JobsFolder, "Job-" + JobGuid.ToString() );
|
|
|
|
// Assert that the folders we need are there
|
|
Debug.Assert( Directory.Exists( JobsFolder ) &&
|
|
Directory.Exists( JobSpecificFolder ) );
|
|
|
|
// Copy all required files into the Job directory
|
|
foreach( KeyValuePair<string, string> Pair in Specification.DependenciesOriginalNames )
|
|
{
|
|
string SrcFile = Path.Combine( AgentApplication.Options.CacheFolder, Pair.Key );
|
|
string DestFile = Path.Combine( JobSpecificFolder, Pair.Value );
|
|
|
|
// Before the file is copied into the job directory, security check
|
|
bool bIsAuthorized = true;
|
|
bool bNeedToCheckFile = ( Path.GetExtension( SrcFile ) == ".exe" ) ||
|
|
( Path.GetExtension( SrcFile ) == ".dll" );
|
|
|
|
// Microsoft redistributable binaries won't be signed by Epic, so allow them to slide
|
|
if( Path.GetFileName( SrcFile ).StartsWith( "msvc", StringComparison.InvariantCultureIgnoreCase ) )
|
|
{
|
|
bNeedToCheckFile = false;
|
|
}
|
|
|
|
if( ( bNeedToCheckFile ) &&
|
|
( Manager.Certificate != null ) )
|
|
{
|
|
// If the Agent is signed, then everything else must be signed as well.
|
|
// Start off pessimistic
|
|
bIsAuthorized = false;
|
|
|
|
X509Certificate NextCertificate = null;
|
|
try
|
|
{
|
|
NextCertificate = X509Certificate.CreateFromSignedFile( SrcFile );
|
|
}
|
|
catch( Exception )
|
|
{
|
|
// Any exception means that either the file isn't signed or has an invalid certificate
|
|
}
|
|
if( NextCertificate != null )
|
|
{
|
|
if( NextCertificate.Equals( Manager.Certificate ) )
|
|
{
|
|
bIsAuthorized = true;
|
|
}
|
|
}
|
|
}
|
|
if( bIsAuthorized )
|
|
{
|
|
File.Copy( SrcFile, DestFile, true );
|
|
}
|
|
else
|
|
{
|
|
Manager.Log( EVerbosityLevel.Informative, ELogColour.Red, "[Job] Failed to use file \"" + Pair.Key + "\" because of a security violation" );
|
|
}
|
|
}
|
|
|
|
// Used to indicate the Job was started successfully
|
|
bool JobStartedSuccessfully = false;
|
|
|
|
// First determine if we're even supposed to start the executable
|
|
EJobTaskFlags JobFlags = Specification.JobFlags;
|
|
if( ( JobFlags & EJobTaskFlags.FLAG_MANUAL_START ) != 0 )
|
|
{
|
|
// User has asked to start the executable on their own (useful
|
|
// for debugging). At connection time, we'll try to match the
|
|
// full executable path name to establish the parent
|
|
// connection. Simply update the state of the Job.
|
|
JobStartedSuccessfully = true;
|
|
}
|
|
else
|
|
{
|
|
string OriginalExecutableName;
|
|
if( Specification.DependenciesOriginalNames.TryGetValue( Specification.ExecutableName, out OriginalExecutableName ) )
|
|
{
|
|
string FullExecutableName = Path.Combine( JobSpecificFolder, OriginalExecutableName );
|
|
|
|
Process TaskProcess = new Process();
|
|
TaskProcess.StartInfo.FileName = FullExecutableName;
|
|
TaskProcess.StartInfo.Arguments = Specification.Parameters;
|
|
TaskProcess.StartInfo.WorkingDirectory = JobSpecificFolder;
|
|
TaskProcess.StartInfo.CreateNoWindow = true;
|
|
|
|
// Other properties worth looking into setting
|
|
// TaskProcess.StartInfo.RedirectStandardInput
|
|
|
|
// Set up the redirect for output
|
|
TaskProcess.StartInfo.UseShellExecute = false;
|
|
TaskProcess.StartInfo.RedirectStandardOutput = true;
|
|
TaskProcess.StartInfo.RedirectStandardError = true;
|
|
TaskProcess.OutputDataReceived += new DataReceivedEventHandler( OutputReceivedDataEventHandler );
|
|
TaskProcess.ErrorDataReceived += new DataReceivedEventHandler( ErrorReceivedDataEventHandler );
|
|
|
|
// If not already set, set the number of allowed cores to use,
|
|
// which should be respected by swarm-aware applications
|
|
if( OwnerIsInstigator )
|
|
{
|
|
// Use local settings, if it's not already set
|
|
if( !TaskProcess.StartInfo.EnvironmentVariables.ContainsKey( "Swarm_MaxCores" ) )
|
|
{
|
|
TaskProcess.StartInfo.EnvironmentVariables.Add( "Swarm_MaxCores", AgentApplication.DeveloperOptions.LocalJobsDefaultProcessorCount.ToString() );
|
|
}
|
|
}
|
|
else
|
|
{
|
|
// Use remote settings
|
|
TaskProcess.StartInfo.EnvironmentVariables.Add( "Swarm_MaxCores", AgentApplication.DeveloperOptions.RemoteJobsDefaultProcessorCount.ToString() );
|
|
}
|
|
|
|
// Set up our exited callback
|
|
TaskProcess.Exited += new EventHandler( ExitedProcessEventHandler );
|
|
TaskProcess.EnableRaisingEvents = true;
|
|
|
|
ProcessObject = TaskProcess;
|
|
if( TaskProcess.Start() )
|
|
{
|
|
if( OwnerIsInstigator )
|
|
{
|
|
if( AgentApplication.Options.AvoidLocalExecution )
|
|
{
|
|
// If we're avoiding local execution, make it as non-invasive as possible
|
|
TaskProcess.PriorityClass = ProcessPriorityClass.Idle;
|
|
}
|
|
else
|
|
{
|
|
// Use local settings
|
|
TaskProcess.PriorityClass = ( ProcessPriorityClass )AgentApplication.DeveloperOptions.LocalJobsDefaultProcessPriority;
|
|
}
|
|
}
|
|
else
|
|
{
|
|
// Use remote settings
|
|
TaskProcess.PriorityClass = ( ProcessPriorityClass )AgentApplication.DeveloperOptions.RemoteJobsDefaultProcessPriority;
|
|
}
|
|
TaskProcess.BeginOutputReadLine();
|
|
TaskProcess.BeginErrorReadLine();
|
|
|
|
Manager.Log( EVerbosityLevel.Informative, ELogColour.Green, "[Job] Launched Job " + Specification.ExecutableName );
|
|
Manager.Log( EVerbosityLevel.Informative, ELogColour.Green, "[Job] PID is " + ProcessObject.Id.ToString() );
|
|
Manager.Log( EVerbosityLevel.Informative, ELogColour.Green, "[Job] GUID is \"" + JobGuid.ToString() + "\"" );
|
|
|
|
// Success
|
|
JobStartedSuccessfully = true;
|
|
}
|
|
}
|
|
else
|
|
{
|
|
//@TODO: Error handling...
|
|
}
|
|
}
|
|
|
|
// If the job was started successfully, however that may be, update
|
|
// the state of the job
|
|
if( JobStartedSuccessfully )
|
|
{
|
|
CurrentState = JobState.AGENT_JOB_RUNNING;
|
|
StartTime = DateTime.UtcNow;
|
|
ErrorCode = Constants.SUCCESS;
|
|
}
|
|
|
|
// Send a message indicating the Job has started, if this is the original owner
|
|
if( ( OwnerIsInstigator ) &&
|
|
( CurrentState == JobState.AGENT_JOB_RUNNING ) )
|
|
{
|
|
AgentJobState JobStartedMessage = new AgentJobState( JobGuid, EJobTaskState.STATE_RUNNING );
|
|
Manager.SendMessageInternal( Owner, JobStartedMessage );
|
|
|
|
// Also, if enabled, request that the agent window be brought
|
|
// to the front, but only for the Instigator
|
|
if (AgentApplication.Options.BringToFront && ((JobFlags & EJobTaskFlags.FLAG_MINIMIZED) == 0))
|
|
{
|
|
AgentApplication.ShowWindow = true;
|
|
}
|
|
}
|
|
}
|
|
catch( Exception Ex )
|
|
{
|
|
// Be sure to null out the process object if anything went wrong (usually starting the process)
|
|
ProcessObject = null;
|
|
|
|
Manager.Log( EVerbosityLevel.Critical, ELogColour.Red, "[StartJobExecution] Exception was: " + Ex.ToString() );
|
|
ErrorCode = Constants.ERROR_EXCEPTION;
|
|
}
|
|
|
|
return ( ErrorCode );
|
|
}
|
|
|
|
public class OpenJobOnRemoteData
|
|
{
|
|
public OpenJobOnRemoteData( AgentJob NewJob, RemoteConnection NewRemote )
|
|
{
|
|
Job = NewJob;
|
|
Remote = NewRemote;
|
|
}
|
|
|
|
public AgentJob Job;
|
|
public RemoteConnection Remote;
|
|
}
|
|
|
|
static void OpenJobOnRemoteThreadProc( Object ObjectState )
|
|
{
|
|
OpenJobOnRemoteData MethodData = ObjectState as OpenJobOnRemoteData;
|
|
if( MethodData != null )
|
|
{
|
|
AgentJob Job = MethodData.Job;
|
|
RemoteConnection Remote = MethodData.Remote;
|
|
|
|
Hashtable OpenJobInParameters = new Hashtable();
|
|
OpenJobInParameters["Version"] = ESwarmVersionValue.VER_1_0;
|
|
OpenJobInParameters["JobGuid"] = Job.JobGuid;
|
|
Hashtable OpenJobOutParameters = null;
|
|
|
|
Int32 JobErrorCode = Remote.Interface.OpenJob( Remote.Handle, OpenJobInParameters, ref OpenJobOutParameters );
|
|
if( JobErrorCode >= 0 )
|
|
{
|
|
Hashtable BeginInParameters = new Hashtable();
|
|
BeginInParameters["Version"] = ESwarmVersionValue.VER_1_0;
|
|
BeginInParameters["Specification32"] = Job.Specification32;
|
|
BeginInParameters["Specification64"] = Job.Specification64;
|
|
BeginInParameters["Description32"] = Job.Description32;
|
|
BeginInParameters["Description64"] = Job.Description64;
|
|
Hashtable BeginOutParameters = null;
|
|
|
|
JobErrorCode = Remote.Interface.BeginJobSpecification( Remote.Handle, BeginInParameters, ref BeginOutParameters );
|
|
if( JobErrorCode >= 0 )
|
|
{
|
|
// If the job is in deterministic mode, add all this agent's tasks now
|
|
// so that if, for any reason, this connection is lost or fails, then
|
|
// all remaining tasks assigned here will fail
|
|
if( Job.DeterministicModeEnabled )
|
|
{
|
|
AgentTaskRequestResponse Response = Job.GetNextTask( Remote );
|
|
while( ( Response != null ) &&
|
|
( Response is AgentTaskSpecification ) )
|
|
{
|
|
Hashtable AddTaskInParameters = new Hashtable();
|
|
AddTaskInParameters["Version"] = ESwarmVersionValue.VER_1_0;
|
|
AddTaskInParameters["Specification"] = Response as AgentTaskSpecification;
|
|
Hashtable AddTaskOutParameters = null;
|
|
|
|
JobErrorCode = Remote.Interface.AddTask( Remote.Handle, AddTaskInParameters, ref AddTaskOutParameters );
|
|
Response = null;
|
|
if( JobErrorCode >= 0 )
|
|
{
|
|
Response = Job.GetNextTask( Remote );
|
|
}
|
|
}
|
|
}
|
|
}
|
|
if( JobErrorCode >= 0 )
|
|
{
|
|
Hashtable EndJobInParameters = null;
|
|
Hashtable EndJobOutParameters = null;
|
|
JobErrorCode = Remote.Interface.EndJobSpecification( Remote.Handle, EndJobInParameters, ref EndJobOutParameters );
|
|
if( JobErrorCode >= 0 )
|
|
{
|
|
Job.Manager.Log( EVerbosityLevel.Informative, ELogColour.Green, "[EndJobSpecification] Started job on remote connection" );
|
|
}
|
|
}
|
|
}
|
|
if( JobErrorCode < 0 )
|
|
{
|
|
Job.Manager.Log( EVerbosityLevel.Informative, ELogColour.Orange, "[EndJobSpecification] Tried to begin the job on a valid remote connection, but failed" );
|
|
Hashtable CloseOwnerInParameters = null;
|
|
Hashtable CloseOwnerOutParameters = null;
|
|
Job.Manager.CloseConnection( Remote.Handle, CloseOwnerInParameters, ref CloseOwnerOutParameters );
|
|
}
|
|
}
|
|
}
|
|
|
|
static void OpenJobOnRemotesThreadProc( Object ObjectState )
|
|
{
|
|
AgentJob Job = ObjectState as AgentJob;
|
|
Debug.Assert( Job != null );
|
|
Debug.Assert( Job.OwnerIsInstigator );
|
|
|
|
LocalConnection LocalOwner = Job.Owner as LocalConnection;
|
|
|
|
// Check if the job is in determinsitic distribution mode
|
|
if( Job.DeterministicModeEnabled )
|
|
{
|
|
// For each child, simply spawn the open job thread for it
|
|
foreach( RemoteConnection Remote in Job.Owner.RemoteChildren.Values )
|
|
{
|
|
// For each remote connection, spawn a thread to start the job
|
|
ThreadPool.QueueUserWorkItem(
|
|
new WaitCallback( OpenJobOnRemoteThreadProc ),
|
|
new OpenJobOnRemoteData( Job, Remote )
|
|
);
|
|
}
|
|
}
|
|
// Otherwise, we're in standard distribuion mode, so only grab help as needed
|
|
else
|
|
{
|
|
bool KeepLookingForHelp = ( ( Job.CurrentState == JobState.AGENT_JOB_RUNNING ) &&
|
|
( Job.PendingTasks.Count > AgentApplication.DeveloperOptions.LocalJobsDefaultProcessorCount ) &&
|
|
( Job.Manager.ResetPotentialRemoteAgents( LocalOwner ) ) );
|
|
while( KeepLookingForHelp )
|
|
{
|
|
// Get another agent and request help. Note that just getting a remote
|
|
// agent back here assures that we've opened and validated a connection
|
|
// with it and it should be ready to work for us
|
|
RemoteConnection Remote = Job.Manager.GetNextRemoteAgent( LocalOwner );
|
|
if( Remote != null )
|
|
{
|
|
// For each remote connection, spawn a thread to start the job
|
|
ThreadPool.QueueUserWorkItem(
|
|
new WaitCallback( OpenJobOnRemoteThreadProc ),
|
|
new OpenJobOnRemoteData( Job, Remote )
|
|
);
|
|
}
|
|
else
|
|
{
|
|
// Subsequent passes will trickle through the unavailable agents until
|
|
// it exhausts the set. Try to requeue any unavailable remote agents
|
|
// (returns true if there are any)
|
|
Job.Manager.RetryUnavailableRemoteAgent( LocalOwner );
|
|
Thread.Sleep( 1000 );
|
|
}
|
|
|
|
// Update our "keep running" condition
|
|
Int32 RunningThreads = AgentApplication.DeveloperOptions.LocalJobsDefaultProcessorCount * ( 1 + Job.Owner.RemoteChildren.Count );
|
|
KeepLookingForHelp = ( ( Job.CurrentState == JobState.AGENT_JOB_RUNNING ) &&
|
|
( Job.PendingTasks.Count > RunningThreads ) );
|
|
}
|
|
|
|
// Done looking for help, make sure we clean up pending connections
|
|
Job.Manager.AbandonPotentialRemoteAgents( LocalOwner );
|
|
}
|
|
}
|
|
|
|
private bool ValidateDeterministicDistributionRequirements()
|
|
{
|
|
// If there's no record of the last run, then fail immediately
|
|
if( Manager.LastSuccessfulJobRecord == null )
|
|
{
|
|
return false;
|
|
}
|
|
|
|
AgentJobRecord LastSuccessfulJobRecord = Manager.LastSuccessfulJobRecord;
|
|
bool DeterministicModeValidated = false;
|
|
|
|
// First, compare the job specification
|
|
bool ExecutableNameMatches = false;
|
|
string ThisOriginalExecutableName;
|
|
if( Specification.DependenciesOriginalNames.TryGetValue( Specification.ExecutableName, out ThisOriginalExecutableName ) )
|
|
{
|
|
string LastOriginalExecutableName;
|
|
if( LastSuccessfulJobRecord.Specification.DependenciesOriginalNames.TryGetValue( LastSuccessfulJobRecord.Specification.ExecutableName, out LastOriginalExecutableName ) )
|
|
{
|
|
if( ThisOriginalExecutableName == LastOriginalExecutableName )
|
|
{
|
|
ExecutableNameMatches = true;
|
|
}
|
|
else
|
|
{
|
|
Manager.Log( EVerbosityLevel.Critical, ELogColour.Red, "[Deterministic] Failed to validate Job, executable name is different" );
|
|
}
|
|
}
|
|
}
|
|
// If the executable name matches, check the set of tasks
|
|
if( ExecutableNameMatches )
|
|
{
|
|
bool AllTasksMatch = false;
|
|
|
|
// Next, make sure that all the tasks in the current specification are in the last job
|
|
List<AgentTask> ListOfTasks = new List<AgentTask>( PendingTasks.ToArray() );
|
|
if( ListOfTasks.Count == LastSuccessfulJobRecord.AllTasks.Count )
|
|
{
|
|
AgentTask NextTask;
|
|
Int32 MatchedTaskCount = 0;
|
|
foreach( AgentTask Task in ListOfTasks )
|
|
{
|
|
if( LastSuccessfulJobRecord.AllTasks.TryGetValue( Task.Specification.TaskGuid, out NextTask ) )
|
|
{
|
|
// For now, just compare the parameters and the cost
|
|
if( ( NextTask.Specification.Parameters == Task.Specification.Parameters ) &&
|
|
( NextTask.Specification.Cost == Task.Specification.Cost ) )
|
|
{
|
|
MatchedTaskCount++;
|
|
}
|
|
else
|
|
{
|
|
string OldTaskDetails = String.Format( "[Deterministic] Old Task: {0}, {1}, {2}",
|
|
NextTask.Specification.TaskGuid,
|
|
NextTask.Specification.Parameters,
|
|
NextTask.Specification.Cost );
|
|
string NewTaskDetails = String.Format( "[Deterministic] New Task: {0}, {1}, {2}",
|
|
Task.Specification.TaskGuid,
|
|
Task.Specification.Parameters,
|
|
Task.Specification.Cost );
|
|
Manager.Log( EVerbosityLevel.Critical, ELogColour.Red, "[Deterministic] Failed to validate Job, task is different" );
|
|
Manager.Log( EVerbosityLevel.Critical, ELogColour.Red, OldTaskDetails );
|
|
Manager.Log( EVerbosityLevel.Critical, ELogColour.Red, NewTaskDetails );
|
|
}
|
|
}
|
|
}
|
|
if( MatchedTaskCount == LastSuccessfulJobRecord.AllTasks.Count )
|
|
{
|
|
// Found all current tasks in the last job record
|
|
AllTasksMatch = true;
|
|
|
|
// Now verify and initialize the running work queues
|
|
LastSuccessfulJobRecord.AgentToTaskQueueMapping.Clear();
|
|
foreach( string WorkerName in LastSuccessfulJobRecord.WorkerAgentNames )
|
|
{
|
|
Queue<AgentTask> TaskQueue;
|
|
if( LastSuccessfulJobRecord.AgentToGoldenTaskQueueMapping.TryGetValue( WorkerName, out TaskQueue ) )
|
|
{
|
|
LastSuccessfulJobRecord.AgentToTaskQueueMapping.Add( WorkerName, new Queue<AgentTask>( TaskQueue ) );
|
|
}
|
|
else
|
|
{
|
|
// If we fail to set the enumerators, fail to validate
|
|
AllTasksMatch = false;
|
|
}
|
|
}
|
|
}
|
|
}
|
|
else
|
|
{
|
|
Manager.Log( EVerbosityLevel.Critical, ELogColour.Red, "[Deterministic] Failed to validate Job, overall task count is different" );
|
|
}
|
|
// If all tasks are the same (enough), make sure we can acquire the same set of agents for the job
|
|
if( AllTasksMatch )
|
|
{
|
|
// If all tasks match, try to connection to each of the
|
|
// remote agents from the previous job
|
|
Int32 WorkerAgentsEmployed = 0;
|
|
for( Int32 i = 0; i < LastSuccessfulJobRecord.WorkerAgentNames.Count; i++ )
|
|
{
|
|
string WorkerAgentName = LastSuccessfulJobRecord.WorkerAgentNames[i];
|
|
string WorkerAgentIPAddress = LastSuccessfulJobRecord.WorkerAgentIPAddresses[i];
|
|
|
|
// Don't try to connect to self and only if this name is allowed
|
|
if( ( WorkerAgentName != System.Net.Dns.GetHostName()) &&
|
|
( Manager.AgentNamePassesAllowedAgentsFilter( WorkerAgentName ) ) )
|
|
{
|
|
// Make sure we can open the connection and get a valid remote connection back
|
|
RemoteConnection Remote;
|
|
AgentInfo NewWorkerInfo = new AgentInfo();
|
|
NewWorkerInfo.Name = WorkerAgentName;
|
|
NewWorkerInfo.Configuration["IPAddress"] = WorkerAgentIPAddress;
|
|
if( ( Manager.TryOpenRemoteConnection( Owner as LocalConnection, NewWorkerInfo, false, DateTime.MinValue, out Remote ) == Constants.SUCCESS ) &&
|
|
( Remote != null ) )
|
|
{
|
|
WorkerAgentsEmployed++;
|
|
}
|
|
else
|
|
{
|
|
Manager.Log( EVerbosityLevel.Critical, ELogColour.Red, "[Deterministic] Could not acquire connection to " + WorkerAgentName + " necessary for job" );
|
|
}
|
|
}
|
|
else
|
|
{
|
|
// We always count ourselves
|
|
WorkerAgentsEmployed++;
|
|
}
|
|
}
|
|
if( WorkerAgentsEmployed == LastSuccessfulJobRecord.WorkerAgentNames.Count )
|
|
{
|
|
// If all necessary agents are employed, we've verified everything
|
|
// we can -> success
|
|
Manager.Log( EVerbosityLevel.Informative, ELogColour.Green, "[Deterministic] Job validated, deterministic distribution enabled" );
|
|
DeterministicModeValidated = true;
|
|
}
|
|
else
|
|
{
|
|
Manager.Log( EVerbosityLevel.Critical, ELogColour.Red, "[Deterministic] Failed to validate Job, could not acquire all agents necessary" );
|
|
}
|
|
}
|
|
}
|
|
return DeterministicModeValidated;
|
|
}
|
|
|
|
public Int32 EndJobSpecification()
|
|
{
|
|
Int32 ErrorCode = Constants.INVALID;
|
|
|
|
// Only if the job is in the pending state can we start it running
|
|
if( CurrentState == JobState.AGENT_JOB_PENDING )
|
|
{
|
|
// Sort the incoming task list by cost
|
|
lock( PendingTasks )
|
|
{
|
|
List<AgentTask> ListOfTasksDescending = new List<AgentTask>( PendingTasks.ToArray() );
|
|
ListOfTasksDescending.Sort(AgentTask.CompareTasksByCostDescending);
|
|
|
|
List<AgentTask> StripedListOfTasks = ListOfTasksDescending;
|
|
|
|
// Lightmass processing of a single task is multithreaded (see ProcessCacheIndirectLightingTask), so ideally we would hand out the most expensive tasks in a round robin ordering
|
|
bool bDistributeMostExpensiveTasksRoundRobin = true;
|
|
int TasksPerAgent = Math.Max(AgentApplication.DeveloperOptions.LocalJobsDefaultProcessorCount, AgentApplication.DeveloperOptions.RemoteJobsDefaultProcessorCount);
|
|
int NumStripes = (ListOfTasksDescending.Count + TasksPerAgent - 1) / TasksPerAgent;
|
|
|
|
if (bDistributeMostExpensiveTasksRoundRobin && TasksPerAgent < ListOfTasksDescending.Count)
|
|
{
|
|
StripedListOfTasks = new List<AgentTask>(ListOfTasksDescending.Count);
|
|
int SourceIndex = 0;
|
|
|
|
List<bool> SourceTaskCopiedArray = new List<bool>(ListOfTasksDescending.Count);
|
|
|
|
for (int TaskIndex = 0; TaskIndex < ListOfTasksDescending.Count; TaskIndex++)
|
|
{
|
|
SourceTaskCopiedArray.Add(false);
|
|
}
|
|
|
|
for (int TaskIndex = 0; TaskIndex < ListOfTasksDescending.Count; TaskIndex++)
|
|
{
|
|
SourceTaskCopiedArray[SourceIndex] = true;
|
|
StripedListOfTasks.Add(ListOfTasksDescending[SourceIndex]);
|
|
|
|
SourceIndex += NumStripes;
|
|
|
|
if (SourceIndex >= ListOfTasksDescending.Count)
|
|
{
|
|
int NumWraparounds = (int)((ulong)(TaskIndex + 1) * (ulong)NumStripes / (ulong)ListOfTasksDescending.Count);
|
|
Debug.Assert(NumWraparounds >= 0);
|
|
|
|
SourceIndex = NumWraparounds;
|
|
}
|
|
}
|
|
|
|
for (int TaskIndex = 0; TaskIndex < ListOfTasksDescending.Count; TaskIndex++)
|
|
{
|
|
Debug.Assert(SourceTaskCopiedArray[TaskIndex]);
|
|
}
|
|
}
|
|
|
|
// Queue the sorted list back up
|
|
PendingTasks.Clear();
|
|
for (int TaskIndex = StripedListOfTasks.Count - 1; TaskIndex >= 0; TaskIndex--)
|
|
{
|
|
// Reverse the order as PendingTasks is a stack, the most expensive tasks should be on the top
|
|
PendingTasks.Push(StripedListOfTasks[TaskIndex]);
|
|
}
|
|
}
|
|
|
|
// We're pessimists
|
|
bool AllowJobToStart = false;
|
|
|
|
// Determine if we meet conditions for distribution
|
|
bool DistributionConditionsMet = ( OwnerIsInstigator ) &&
|
|
( AgentApplication.Options.EnableStandaloneMode == false ) &&
|
|
( ( Specification.JobFlags & EJobTaskFlags.FLAG_MANUAL_START ) == 0 ) &&
|
|
( ( Specification.JobFlags & EJobTaskFlags.FLAG_ALLOW_REMOTE ) != 0 );
|
|
|
|
// Check if the user enabled deterministic distribution mode
|
|
if( ( DistributionConditionsMet ) &&
|
|
( AgentApplication.DeveloperOptions.ReplayLastDistribution ) )
|
|
{
|
|
if( ValidateDeterministicDistributionRequirements() )
|
|
{
|
|
// If enabled and verified, set the deterministic mode flag
|
|
// and allow the job to start normally
|
|
DeterministicModeEnabled = true;
|
|
AllowJobToStart = true;
|
|
}
|
|
}
|
|
else
|
|
{
|
|
// Otherwise, allow the job to start normally
|
|
AllowJobToStart = true;
|
|
}
|
|
|
|
// If we should still start the job normally after the conditions above
|
|
if( AllowJobToStart )
|
|
{
|
|
ErrorCode = StartJobExecution();
|
|
if( ErrorCode >= 0 )
|
|
{
|
|
// Update the visualizer if the owner is the Instigator
|
|
if( OwnerIsInstigator )
|
|
{
|
|
// Send the total task count to the visualizer if this agent is the Instigator
|
|
AgentApplication.UpdateMachineState(System.Net.Dns.GetHostName(), ( Int32 )TaskCount, EProgressionState.TaskTotal );
|
|
}
|
|
|
|
// If we have more than N pending tasks, the job owner is the instigator,
|
|
// and the Agent settings and the job specification allow distribution,
|
|
// request the help of other, remote agents (N is currently the number of
|
|
// allowed worker threads for local job executables)
|
|
if( ( DistributionConditionsMet ) &&
|
|
( PendingTasks.Count > AgentApplication.DeveloperOptions.LocalJobsDefaultProcessorCount ) )
|
|
{
|
|
// Spawn a thread to manage adding more agents to the job
|
|
ThreadPool.QueueUserWorkItem( new WaitCallback( OpenJobOnRemotesThreadProc ), this );
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
return ( ErrorCode );
|
|
}
|
|
|
|
public Int32 CloseJob()
|
|
{
|
|
// Before we close the job, make sure all messages have been processed to
|
|
// make sure we avoid any race condition between getting updates to tasks
|
|
// or the job and closing the job
|
|
Manager.FlushMessageQueue( Owner, false );
|
|
|
|
// Update the state only within a mutex to protect anyone trying to read
|
|
// the state at the same time
|
|
lock( CurrentStateLock )
|
|
{
|
|
// Only do this if this Job hasn't already been closed
|
|
if( CurrentState != JobState.AGENT_JOB_CLOSED )
|
|
{
|
|
CurrentState = JobState.AGENT_JOB_CLOSED;
|
|
StopTime = DateTime.UtcNow;
|
|
|
|
// First, resolve any outstanding reservations
|
|
CheckForReservations();
|
|
|
|
// Determine success state of the Job
|
|
lock( CurrentSuccessStateLock )
|
|
{
|
|
// Only update if the Job success state has not been determined already by other means
|
|
if( CurrentSuccessState == JobSuccessState.AGENT_JOB_INCOMPLETE )
|
|
{
|
|
// If this is an agent managed, Task-based process
|
|
if( ( Specification != null ) &&
|
|
( ( Specification.JobFlags & EJobTaskFlags.FLAG_MANUAL_START ) == 0 ) &&
|
|
( TaskCount > 0 ) )
|
|
{
|
|
// Check for a set of known failure cases
|
|
bool IsAStandardFailureCase = false;
|
|
string NewMessageText = "No message provided!";
|
|
|
|
// If there are any Tasks still pending
|
|
if( PendingTasks.Count != 0 )
|
|
{
|
|
// Log and send an INFO message describing the failure
|
|
NewMessageText = "Job has failed! Job is closed while Tasks are still PENDING";
|
|
IsAStandardFailureCase = true;
|
|
}
|
|
// If there are any Tasks still running
|
|
else if( RunningTasks.Count != 0 )
|
|
{
|
|
// Log and send an INFO message describing the failure
|
|
NewMessageText = "Job has failed! Job is closed while Tasks are still RUNNING";
|
|
IsAStandardFailureCase = true;
|
|
}
|
|
// If any Task was reported a failure
|
|
else if( TaskFailureCount != 0 )
|
|
{
|
|
// Log and send an INFO message describing the failure
|
|
NewMessageText = "Job has failed! The task failure count is non-zero";
|
|
IsAStandardFailureCase = true;
|
|
}
|
|
|
|
if( IsAStandardFailureCase )
|
|
{
|
|
CurrentSuccessState = JobSuccessState.AGENT_JOB_FAILURE;
|
|
if( OwnerIsInstigator )
|
|
{
|
|
SendJobCompletedMessage( new AgentInfoMessage( NewMessageText ) );
|
|
}
|
|
}
|
|
else
|
|
{
|
|
// This is the only way to mark an agent managed, Task-based Job a success
|
|
CurrentSuccessState = JobSuccessState.AGENT_JOB_SUCCESS;
|
|
if( OwnerIsInstigator )
|
|
{
|
|
// Log and send an INFO message describing the failure
|
|
NewMessageText = "Job is a success!";
|
|
SendJobCompletedMessage( new AgentInfoMessage( NewMessageText ) );
|
|
}
|
|
}
|
|
}
|
|
else
|
|
{
|
|
// Otherwise, the process should shut itself down now that the Job
|
|
// has been ended and any reservations have been sent out. If it
|
|
// fails to quit itself, it will be killed and we'll still get
|
|
// the exited process callback
|
|
}
|
|
}
|
|
}
|
|
|
|
// For each remote connection we have for this Job, end the Job.
|
|
// Ending the Job will eventually cause remote Job executables to
|
|
// be notified that the Job is closed.
|
|
foreach( RemoteConnection RemoteChild in Owner.RemoteChildren.Values )
|
|
{
|
|
Hashtable CloseJobInParameters = null;
|
|
Hashtable CloseJobOutParameters = null;
|
|
RemoteChild.Interface.CloseJob( RemoteChild.Handle, CloseJobInParameters, ref CloseJobOutParameters );
|
|
}
|
|
|
|
// If this is the Instigator, perform additional post-job work
|
|
if( OwnerIsInstigator )
|
|
{
|
|
// Inform the visualizer that we've disconnected
|
|
AgentApplication.UpdateMachineState(System.Net.Dns.GetHostName(), -1, EProgressionState.InstigatorDisconnected );
|
|
|
|
// If the job was a success, record the state for determinisitc replay
|
|
if( CurrentSuccessState == JobSuccessState.AGENT_JOB_SUCCESS )
|
|
{
|
|
// We're done with the last run record now
|
|
Manager.LastSuccessfulJobRecord = null;
|
|
|
|
bool DeterministicModeAllowed = true;
|
|
AgentJobRecord NewJobRecord = new AgentJobRecord();
|
|
NewJobRecord.Specification = Specification;
|
|
|
|
// Sort all retired tasks by assign time to make sure the order is correct
|
|
List<AgentTask> ListOfRetiredTasks = new List<AgentTask>( RetiredTasks.ToArray() );
|
|
ListOfRetiredTasks.Sort( AgentTask.CompareTasksByAssignTime );
|
|
|
|
// Assign out the tasks based on where it was assigned and when
|
|
foreach( AgentTask NextTask in ListOfRetiredTasks )
|
|
{
|
|
// Add the task to the set of all tasks
|
|
NewJobRecord.AllTasks.Add( NextTask.Specification.TaskGuid, NextTask );
|
|
|
|
// Add the task to the agent-specific queue, creating an entry
|
|
// for the agent if we haven't seen it yet
|
|
Queue<AgentTask> TaskQueue = null;
|
|
string NameOfWorker = Manager.MachineNameFromConnection( NextTask.CurrentOwner );
|
|
string IPAddressOfWorker = Manager.MachineIPAddressFromConnection( NextTask.CurrentOwner );
|
|
if( !NewJobRecord.WorkerAgentNames.Contains( NameOfWorker ) )
|
|
{
|
|
// Create a new task queue for the newly discovered agent
|
|
TaskQueue = new Queue<AgentTask>();
|
|
TaskQueue.Enqueue( NextTask );
|
|
|
|
// Add this new agent to both the set of names and the task mapping sets
|
|
NewJobRecord.WorkerAgentNames.Add( NameOfWorker );
|
|
NewJobRecord.WorkerAgentIPAddresses.Add( IPAddressOfWorker );
|
|
NewJobRecord.AgentToGoldenTaskQueueMapping.Add( NameOfWorker, TaskQueue );
|
|
}
|
|
else if( NewJobRecord.AgentToGoldenTaskQueueMapping.TryGetValue( NameOfWorker, out TaskQueue ) )
|
|
{
|
|
// Queue up the next task
|
|
TaskQueue.Enqueue( NextTask );
|
|
}
|
|
else
|
|
{
|
|
// Error, we should fail the entire thing
|
|
DeterministicModeAllowed = false;
|
|
break;
|
|
}
|
|
}
|
|
|
|
// If allowed, assign it for the next run
|
|
if( DeterministicModeAllowed )
|
|
{
|
|
Manager.LastSuccessfulJobRecord = NewJobRecord;
|
|
}
|
|
}
|
|
|
|
// Report some of the stats for the Job
|
|
// For each successfully completed task, log the time it took and the time/cost
|
|
foreach( AgentTask NextTask in RetiredTasks.ToArray() )
|
|
{
|
|
if( NextTask.CurrentState.TaskState == EJobTaskState.TASK_STATE_COMPLETE_SUCCESS )
|
|
{
|
|
TimeSpan ScheduledTime = NextTask.StartTime - NextTask.AssignTime;
|
|
TimeSpan RunningTime = NextTask.StopTime - NextTask.StartTime;
|
|
string LogMessage = String.Format( "[CloseJob] Task {0} {1} - Scheduled(ms): {2}, Running(ms): {3}, Cost: {4}, Running(ms)/Cost: {5}",
|
|
NextTask.Specification.TaskGuid,
|
|
NextTask.Specification.Parameters,
|
|
ScheduledTime.TotalMilliseconds,
|
|
RunningTime.TotalMilliseconds,
|
|
NextTask.Specification.Cost,
|
|
( double )RunningTime.TotalMilliseconds / ( double )NextTask.Specification.Cost );
|
|
Manager.Log( EVerbosityLevel.Verbose, ELogColour.Green, LogMessage );
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// Attempt to report the final stats to the DB
|
|
PostJobStatsToDB();
|
|
|
|
return ( Constants.SUCCESS );
|
|
}
|
|
|
|
private bool AvoidNextTask( Connection RequestingConection )
|
|
{
|
|
Debug.Assert( RequestingConection.Job != null );
|
|
bool SafeToAvoidNextTask =
|
|
( AgentApplication.Options.AvoidLocalExecution ) &&
|
|
( RequestingConection is LocalConnection ) &&
|
|
( RequestingConection.Job.Owner.RemoteChildren.Count > 0 );
|
|
|
|
return SafeToAvoidNextTask;
|
|
}
|
|
|
|
public AgentTaskRequestResponse GetNextTask( Connection RequestingConnection )
|
|
{
|
|
// We need the state to not change while we're in here
|
|
lock( CurrentStateLock )
|
|
{
|
|
// Take the next Task off of the Pending list and add it to the Active list
|
|
if( CurrentState == JobState.AGENT_JOB_RUNNING )
|
|
{
|
|
lock( PendingTasks )
|
|
{
|
|
AgentTask NextTask = null;
|
|
|
|
// Check for whether we're running in deterministic mode
|
|
if( DeterministicModeEnabled )
|
|
{
|
|
// If so, look up the requesting connection's task queue
|
|
Queue<AgentTask> TaskQueue;
|
|
string RequestorName = Manager.MachineNameFromConnection( RequestingConnection );
|
|
if( Manager.LastSuccessfulJobRecord.AgentToTaskQueueMapping.TryGetValue( RequestorName, out TaskQueue ) )
|
|
{
|
|
if( TaskQueue.Count > 0 )
|
|
{
|
|
// Grab the next pending task
|
|
NextTask = TaskQueue.Dequeue();
|
|
}
|
|
}
|
|
}
|
|
// Otherwise, we're in a standard distribution mode, see if there
|
|
// are tasks available to hand out
|
|
else if( PendingTasks.Count > 0 )
|
|
{
|
|
// Check to see if should avoid the next task
|
|
if( !AvoidNextTask( RequestingConnection ) )
|
|
{
|
|
NextTask = PendingTasks.Pop();
|
|
}
|
|
}
|
|
|
|
if( NextTask != null )
|
|
{
|
|
// Assign the new owner and add the task to the running sets
|
|
NextTask.CurrentOwner = RequestingConnection;
|
|
|
|
// Set the assign time of the task as well as the start time,
|
|
// in case they don't send back a RUNNING message, which will
|
|
// set the proper start time
|
|
NextTask.AssignTime = DateTime.UtcNow;
|
|
NextTask.StartTime = DateTime.UtcNow;
|
|
|
|
// Update additional stats
|
|
if( RequestingConnection is RemoteConnection )
|
|
{
|
|
TaskCountRemote++;
|
|
}
|
|
|
|
AgentTaskSpecification TaskSpecification = NextTask.Specification;
|
|
RequestingConnection.RunningTasks.Add( TaskSpecification.TaskGuid, NextTask );
|
|
RunningTasks.Add( TaskSpecification.TaskGuid, NextTask );
|
|
|
|
// Send back the now ready task specification
|
|
return TaskSpecification;
|
|
}
|
|
// If we're not the final authority of this job, then we'll send back a
|
|
// reservation and allow the Instigator send the real answer later.
|
|
// Alternately, if the owner is the Instigator, we also need to send
|
|
// back a reservation to make sure the local job application runs until
|
|
// the job is done.
|
|
else if( ( OwnerIsInstigator == false ) ||
|
|
( RequestingConnection is LocalConnection ) )
|
|
{
|
|
// Always track outstanding reservations
|
|
RequestingConnection.ReservationCount++;
|
|
TaskReservationCount++;
|
|
|
|
// For local connections, send a RESERVATION response which indicates that we
|
|
// have no work currently to hand out, but we might in the future, so sit
|
|
// tight. For remote connections, we only need to track the reservation
|
|
if( RequestingConnection is LocalConnection )
|
|
{
|
|
return ( new AgentTaskRequestResponse( JobGuid,
|
|
ETaskRequestResponseType.RESERVATION ) );
|
|
}
|
|
}
|
|
// Otherwise, we're all out of tasks, so release the worker
|
|
else
|
|
{
|
|
// No more distributed tasks to hand out, send back a release
|
|
return ( new AgentTaskRequestResponse( JobGuid,
|
|
ETaskRequestResponseType.RELEASE ) );
|
|
}
|
|
}
|
|
}
|
|
else if( CurrentState == JobState.AGENT_JOB_CLOSED )
|
|
{
|
|
// No more distributed tasks to hand out, send back a release
|
|
return ( new AgentTaskRequestResponse( JobGuid,
|
|
ETaskRequestResponseType.RELEASE ) );
|
|
}
|
|
}
|
|
return null;
|
|
}
|
|
|
|
public void CancelReservations( Connection ReservationHolder )
|
|
{
|
|
// We need the state to not change while we're in here
|
|
lock( CurrentStateLock )
|
|
{
|
|
TaskReservationCount -= ReservationHolder.ReservationCount;
|
|
ReservationHolder.ReservationCount = 0;
|
|
}
|
|
}
|
|
|
|
public void CheckForReservations()
|
|
{
|
|
// We need the state to not change while we're in here
|
|
lock( CurrentStateLock )
|
|
{
|
|
// Depending on the current Job state, handle any outstanding Task reservations
|
|
if( ( CurrentState == JobState.AGENT_JOB_UNSPECIFIED ) ||
|
|
( CurrentState == JobState.AGENT_JOB_PENDING ) )
|
|
{
|
|
// As long as the job is PENDING, do nothing. In fact, there shouldn't be
|
|
// any reservations at this point. Assert this.
|
|
Debug.Assert( TaskReservationCount == 0 );
|
|
}
|
|
else if( CurrentState == JobState.AGENT_JOB_RUNNING )
|
|
{
|
|
// If there are any outstanding reservations and tasks to give out...
|
|
UInt32 PendingTasksCount = ( UInt32 )PendingTasks.Count;
|
|
UInt32 ReservationsToTryToFill = Math.Min( PendingTasksCount, TaskReservationCount );
|
|
|
|
if( ReservationsToTryToFill > 0 )
|
|
{
|
|
// Try to fulfill the local ones first
|
|
List<LocalConnection> LocalChildren = Owner.LocalChildren.Values;
|
|
List<LocalConnection>.Enumerator Children = LocalChildren.GetEnumerator();
|
|
|
|
while( ( Children.MoveNext() ) &&
|
|
( ReservationsToTryToFill > 0 ) )
|
|
{
|
|
LocalConnection Local = Children.Current;
|
|
while( ( Local.ReservationCount > 0 ) &&
|
|
( ReservationsToTryToFill > 0 ) )
|
|
{
|
|
// If we should avoid assigning tasks to this agent,
|
|
// simply continue on to the next one (only meaningful
|
|
// for local connections since we currently would only
|
|
// avoid tasks on the Instigator)
|
|
if( AvoidNextTask( Local ) )
|
|
{
|
|
break;
|
|
}
|
|
|
|
AgentTaskRequestResponse Response = GetNextTask( Local );
|
|
if( ( Response != null ) &&
|
|
( Response is AgentTaskSpecification ) )
|
|
{
|
|
AgentTaskSpecification NextTask = Response as AgentTaskSpecification;
|
|
|
|
// For local connections, send the task message directly
|
|
Response.To = Local.Handle;
|
|
Response.From = Owner.Handle;
|
|
Manager.SendMessageInternal( Owner, Response );
|
|
|
|
// Consider the reservation satisfied
|
|
TaskReservationCount--;
|
|
Local.ReservationCount--;
|
|
ReservationsToTryToFill--;
|
|
}
|
|
else
|
|
{
|
|
// If we didn't get a task, don't keep trying for now
|
|
ReservationsToTryToFill = 0;
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// If we still have pending tasks try to fulfill any outstanding remote reservations
|
|
if( ReservationsToTryToFill > 0 )
|
|
{
|
|
List<RemoteConnection> RemoteChildren = Owner.RemoteChildren.Values;
|
|
List<RemoteConnection>.Enumerator Children = RemoteChildren.GetEnumerator();
|
|
|
|
while( ( Children.MoveNext() ) &&
|
|
( ReservationsToTryToFill > 0 ) )
|
|
{
|
|
RemoteConnection Remote = Children.Current;
|
|
while( ( Remote.ReservationCount > 0 ) &&
|
|
( ReservationsToTryToFill > 0 ) )
|
|
{
|
|
AgentTaskRequestResponse Response = GetNextTask( Remote );
|
|
if( ( Response != null ) &&
|
|
( Response is AgentTaskSpecification ) )
|
|
{
|
|
AgentTaskSpecification NextTask = Response as AgentTaskSpecification;
|
|
|
|
// For remote connections, add the task using the job/task API
|
|
Hashtable AddTaskInParameters = new Hashtable();
|
|
AddTaskInParameters["Version"] = ESwarmVersionValue.VER_1_0;
|
|
AddTaskInParameters["Specification"] = NextTask;
|
|
Hashtable AddTaskOutParameters = null;
|
|
|
|
if( Remote.Interface.AddTask( Remote.Handle, AddTaskInParameters, ref AddTaskOutParameters ) >= 0 )
|
|
{
|
|
// Consider the reservation satisfied
|
|
TaskReservationCount--;
|
|
Remote.ReservationCount--;
|
|
ReservationsToTryToFill--;
|
|
}
|
|
else
|
|
{
|
|
// If this fails for any reason, update the task and move on to the next child.
|
|
// Note that marking it KILLED will allow it to be requeued for another agent
|
|
// and ultimately it'll end up on the local agent, where if it fails, it'll
|
|
// be marked a real failure.
|
|
UpdateTaskState( new AgentTaskState( NextTask.JobGuid, NextTask.TaskGuid, EJobTaskState.TASK_STATE_KILLED ) );
|
|
break;
|
|
}
|
|
}
|
|
else
|
|
{
|
|
// If we didn't get a task, don't keep trying for now
|
|
ReservationsToTryToFill = 0;
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
else if( CurrentState == JobState.AGENT_JOB_CLOSED )
|
|
{
|
|
// If there are any outstanding reservations, we can release them now that we know
|
|
// there will be no more tasks added to the Job
|
|
if( TaskReservationCount > 0 )
|
|
{
|
|
foreach( LocalConnection Local in Owner.LocalChildren.Values )
|
|
{
|
|
// If there are any outstanding reservations, a single RELEASE is sufficient
|
|
if( Local.ReservationCount > 0 )
|
|
{
|
|
AgentTaskRequestResponse ReleaseMessage =
|
|
new AgentTaskRequestResponse( JobGuid, ETaskRequestResponseType.RELEASE );
|
|
ReleaseMessage.To = Local.Handle;
|
|
ReleaseMessage.From = Owner.Handle;
|
|
Manager.SendMessageInternal( Owner, ReleaseMessage );
|
|
}
|
|
// Consider all reservations canceled
|
|
TaskReservationCount -= Local.ReservationCount;
|
|
Local.ReservationCount = 0;
|
|
}
|
|
|
|
foreach( RemoteConnection Remote in Owner.RemoteChildren.Values )
|
|
{
|
|
// If there are any outstanding reservations, a single RELEASE is sufficient
|
|
if( Remote.ReservationCount > 0 )
|
|
{
|
|
AgentTaskRequestResponse ReleaseMessage =
|
|
new AgentTaskRequestResponse( JobGuid, ETaskRequestResponseType.RELEASE );
|
|
ReleaseMessage.To = Remote.Handle;
|
|
ReleaseMessage.From = Owner.Handle;
|
|
Manager.SendMessageInternal( Owner, ReleaseMessage );
|
|
}
|
|
// Consider all reservations canceled
|
|
TaskReservationCount -= Remote.ReservationCount;
|
|
Remote.ReservationCount = 0;
|
|
}
|
|
|
|
// Sanity check
|
|
Debug.Assert( TaskReservationCount == 0 );
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
private void UpdateTaskStateAsSuccess( AgentTask RunningTask )
|
|
{
|
|
TaskSuccessCount++;
|
|
RunningTask.StopTime = DateTime.UtcNow;
|
|
RunningTasks.Remove( RunningTask.Specification.TaskGuid );
|
|
RunningTask.CurrentOwner.RunningTasks.Remove( RunningTask.Specification.TaskGuid );
|
|
RetiredTasks.Enqueue( RunningTask );
|
|
}
|
|
|
|
private void UpdateTaskStateAsFailure( AgentTask RunningTask )
|
|
{
|
|
TaskFailureCount++;
|
|
RunningTask.StopTime = DateTime.UtcNow;
|
|
RunningTasks.Remove( RunningTask.Specification.TaskGuid );
|
|
RunningTask.CurrentOwner.RunningTasks.Remove( RunningTask.Specification.TaskGuid );
|
|
RetiredTasks.Enqueue( RunningTask );
|
|
}
|
|
|
|
private void UpdateTaskStateAsRequeued( AgentTask RunningTask )
|
|
{
|
|
// A task is killed or rejected when a connection is closed and it still has
|
|
// pending or running tasks assigned to it. Simply requeue the tasks for
|
|
// another agent to pick up.
|
|
|
|
RunningTask.StopTime = DateTime.UtcNow;
|
|
RunningTasks.Remove( RunningTask.Specification.TaskGuid );
|
|
RunningTask.CurrentOwner.RunningTasks.Remove( RunningTask.Specification.TaskGuid );
|
|
|
|
// Update stats
|
|
if( RunningTask.CurrentOwner is RemoteConnection )
|
|
{
|
|
TaskCountRemote--;
|
|
}
|
|
|
|
// Reset a couple states before requeueing
|
|
RunningTask.CurrentState = new AgentTaskState( RunningTask.Specification.JobGuid, RunningTask.Specification.TaskGuid, EJobTaskState.TASK_STATE_IDLE );
|
|
RunningTask.CurrentOwner = null;
|
|
|
|
PendingTasks.Push( RunningTask );
|
|
TaskCountRequeue++;
|
|
|
|
// With the killed task requeued, check for reservations to see
|
|
// if anyone else can take the task on right away
|
|
CheckForReservations();
|
|
}
|
|
|
|
public void UpdateTaskState( AgentTaskState UpdatedTaskState )
|
|
{
|
|
// Sanity checks
|
|
Debug.Assert( CurrentState != JobState.AGENT_JOB_UNSPECIFIED );
|
|
Debug.Assert( CurrentState != JobState.AGENT_JOB_PENDING );
|
|
|
|
AgentTask RunningTask;
|
|
if( RunningTasks.TryGetValue( UpdatedTaskState.TaskGuid, out RunningTask ) )
|
|
{
|
|
// Update the individual Task state
|
|
RunningTask.CurrentState = UpdatedTaskState;
|
|
switch( UpdatedTaskState.TaskState )
|
|
{
|
|
case EJobTaskState.TASK_STATE_ACCEPTED:
|
|
// Nothing to do right now, but we'll need to track start times, etc. later
|
|
break;
|
|
|
|
case EJobTaskState.TASK_STATE_RUNNING:
|
|
// Mark the real start time of this task (also set when we give the task out)
|
|
RunningTask.StartTime = DateTime.UtcNow;
|
|
break;
|
|
|
|
case EJobTaskState.TASK_STATE_COMPLETE_SUCCESS:
|
|
UpdateTaskStateAsSuccess( RunningTask );
|
|
break;
|
|
|
|
case EJobTaskState.TASK_STATE_REJECTED:
|
|
if( RunningTask.CurrentOwner is RemoteConnection )
|
|
{
|
|
Manager.Log( EVerbosityLevel.Informative, ELogColour.Orange, "[UpdateTaskState]: Task Rejected remotely by " + ( RunningTask.CurrentOwner as RemoteConnection ).Info.Name );
|
|
Manager.Log( EVerbosityLevel.Informative, ELogColour.Orange, "[UpdateTaskState]: Requeueing: " + RunningTask.Specification.Parameters );
|
|
UpdateTaskStateAsRequeued( RunningTask );
|
|
}
|
|
else
|
|
{
|
|
Manager.Log( EVerbosityLevel.Informative, ELogColour.Red, "[UpdateTaskState]: Task Rejected locally by " + System.Net.Dns.GetHostName() + ", counted as failure");
|
|
UpdateTaskStateAsFailure( RunningTask );
|
|
}
|
|
break;
|
|
|
|
case EJobTaskState.TASK_STATE_KILLED:
|
|
if( RunningTask.CurrentOwner is RemoteConnection )
|
|
{
|
|
Manager.Log( EVerbosityLevel.Informative, ELogColour.Orange, "[UpdateTaskState]: Task Killed remotely by " + ( RunningTask.CurrentOwner as RemoteConnection ).Info.Name );
|
|
Manager.Log( EVerbosityLevel.Informative, ELogColour.Orange, "[UpdateTaskState]: Requeueing: " + RunningTask.Specification.Parameters );
|
|
UpdateTaskStateAsRequeued( RunningTask );
|
|
}
|
|
else
|
|
{
|
|
Manager.Log( EVerbosityLevel.Informative, ELogColour.Red, "[UpdateTaskState]: Task Killed locally by " + System.Net.Dns.GetHostName() + ", counted as failure" );
|
|
UpdateTaskStateAsFailure( RunningTask );
|
|
}
|
|
break;
|
|
|
|
case EJobTaskState.TASK_STATE_COMPLETE_FAILURE:
|
|
if( RunningTask.CurrentOwner is RemoteConnection )
|
|
{
|
|
Manager.Log( EVerbosityLevel.Informative, ELogColour.Red, "[UpdateTaskState]: Task Failed on " + ( RunningTask.CurrentOwner as RemoteConnection ).Info.Name );
|
|
}
|
|
else
|
|
{
|
|
Manager.Log( EVerbosityLevel.Informative, ELogColour.Red, "[UpdateTaskState]: Task Failed on " + System.Net.Dns.GetHostName());
|
|
}
|
|
Manager.Log( EVerbosityLevel.Informative, ELogColour.Red, "[UpdateTaskState]: Task Failed: " + RunningTask.Specification.Parameters );
|
|
UpdateTaskStateAsFailure( RunningTask );
|
|
break;
|
|
}
|
|
|
|
// Update the owning Job state, by checking for failures. Success is only
|
|
// determined after all Tasks are assured to be done or orphanable. Only
|
|
// do this one time and let the new state, if there is one, be sticky.
|
|
lock( CurrentSuccessStateLock )
|
|
{
|
|
if( CurrentSuccessState == JobSuccessState.AGENT_JOB_INCOMPLETE )
|
|
{
|
|
// Updtae if any task is a failure
|
|
if( TaskFailureCount > 0 )
|
|
{
|
|
// Update the state and send a message indicating the failure
|
|
CurrentSuccessState = JobSuccessState.AGENT_JOB_FAILURE;
|
|
if( OwnerIsInstigator )
|
|
{
|
|
// Log and send an INFO message describing the failure
|
|
string NewMessageText = "Job has failed! The task failure count is non-zero";
|
|
SendJobCompletedMessage( new AgentInfoMessage( NewMessageText ) );
|
|
}
|
|
}
|
|
// Update if all tasks are successful and we're the instigator, since only
|
|
// the instigator can make this determination properly
|
|
else if( ( TaskSuccessCount == TaskCount ) &&
|
|
( OwnerIsInstigator ) )
|
|
{
|
|
CurrentSuccessState = JobSuccessState.AGENT_JOB_SUCCESS;
|
|
if( OwnerIsInstigator )
|
|
{
|
|
// Log and send an INFO message describing the success
|
|
string NewMessageText = "Job is a success!";
|
|
SendJobCompletedMessage( new AgentInfoMessage( NewMessageText ) );
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// Update the visualizer if this agent is the Instigator
|
|
if( OwnerIsInstigator )
|
|
{
|
|
AgentApplication.UpdateMachineState(System.Net.Dns.GetHostName(), RetiredTasks.Count, EProgressionState.TasksCompleted );
|
|
AgentApplication.UpdateMachineState(System.Net.Dns.GetHostName(), RunningTasks.Count, EProgressionState.TasksInProgress );
|
|
}
|
|
}
|
|
}
|
|
|
|
// Specification for the Job
|
|
public Agent Manager = null;
|
|
public Connection Owner = null;
|
|
public bool OwnerIsInstigator = false;
|
|
public bool DeterministicModeEnabled = false;
|
|
|
|
public AgentGuid JobGuid;
|
|
public AgentJobSpecification Specification32 = null;
|
|
public AgentJobSpecification Specification64 = null;
|
|
public AgentJobSpecification Specification = null;
|
|
|
|
// Specifications for Tasks belonging to the Job
|
|
public UInt32 TaskCount = 0;
|
|
public UInt32 TaskCountRemote = 0;
|
|
public UInt32 TaskCountRequeue = 0;
|
|
public UInt32 TaskSuccessCount = 0;
|
|
public UInt32 TaskFailureCount = 0;
|
|
public UInt32 TaskReservationCount = 0;
|
|
public ReaderWriterStack<AgentTask> PendingTasks = new ReaderWriterStack<AgentTask>();
|
|
public ReaderWriterDictionary<AgentGuid, AgentTask> RunningTasks = new ReaderWriterDictionary<AgentGuid, AgentTask>();
|
|
public ReaderWriterQueue<AgentTask> RetiredTasks = new ReaderWriterQueue<AgentTask>();
|
|
|
|
// Variables used for Job process monitoring
|
|
public JobState CurrentState = JobState.AGENT_JOB_UNSPECIFIED;
|
|
public Object CurrentStateLock = new Object();
|
|
public JobSuccessState CurrentSuccessState = JobSuccessState.AGENT_JOB_INCOMPLETE;
|
|
public Object CurrentSuccessStateLock = new Object();
|
|
|
|
public Process ProcessObject = null;
|
|
public Object ProcessObjectLock = new Object();
|
|
public Int32 ProcessObjectOutputLines = 0;
|
|
|
|
public Int32 ProcessObjectExitCode = 0;
|
|
public Int64 ProcessObjectPeakVirtualMemorySize64 = 0;
|
|
public Int64 ProcessObjectPeakWorkingSet64 = 0;
|
|
|
|
// Cache usage statistics for the Job
|
|
public Int32 CacheRequests = 0;
|
|
public Int32 CacheMisses = 0;
|
|
public Int32 CacheHits = 0;
|
|
|
|
// Simple push/pull channel statistics for the Job
|
|
public Int64 NetworkBytesSent = 0;
|
|
public Int64 NetworkBytesReceived = 0;
|
|
|
|
// Optional, common description values for the job
|
|
public Hashtable Description32 = null;
|
|
public Hashtable Description64 = null;
|
|
public Hashtable Description = null;
|
|
|
|
public bool JobStatsPostedToDB = false;
|
|
|
|
public DateTime StartTime;
|
|
public DateTime StopTime;
|
|
}
|
|
|
|
///////////////////////////////////////////////////////////////////////////
|
|
|
|
/**
|
|
* A simple record used to keep track of the last job, including the remote
|
|
* agents that were employed, and where each task was distributed to
|
|
*/
|
|
public class AgentJobRecord
|
|
{
|
|
public AgentJobRecord()
|
|
{
|
|
Specification = null;
|
|
WorkerAgentNames = new List<string>();
|
|
WorkerAgentIPAddresses = new List<string>();
|
|
AllTasks = new ReaderWriterDictionary<AgentGuid, AgentTask>();
|
|
AgentToGoldenTaskQueueMapping = new ReaderWriterDictionary<string, Queue<AgentTask>>();
|
|
AgentToTaskQueueMapping = new ReaderWriterDictionary<string, Queue<AgentTask>>();
|
|
}
|
|
|
|
public AgentJobSpecification Specification;
|
|
public List<string> WorkerAgentNames;
|
|
public List<string> WorkerAgentIPAddresses;
|
|
public ReaderWriterDictionary<AgentGuid, AgentTask> AllTasks;
|
|
public ReaderWriterDictionary<string, Queue<AgentTask>> AgentToGoldenTaskQueueMapping;
|
|
public ReaderWriterDictionary<string, Queue<AgentTask>> AgentToTaskQueueMapping;
|
|
}
|
|
|
|
///////////////////////////////////////////////////////////////////////////
|
|
|
|
/**
|
|
* Implementation of job management behavior in the Agent
|
|
*/
|
|
public partial class Agent : MarshalByRefObject, IAgentInternalInterface, IAgentInterface
|
|
{
|
|
///////////////////////////////////////////////////////////////////////////
|
|
|
|
// The set of all active Jobs for this Agent
|
|
public ReaderWriterDictionary<AgentGuid, AgentJob> ActiveJobs = new ReaderWriterDictionary<AgentGuid, AgentJob>();
|
|
public AgentJobRecord LastSuccessfulJobRecord = null;
|
|
|
|
// A special debug Job GUID
|
|
public AgentGuid DebuggingJobGuid = new AgentGuid( 0x00000123, 0x00004567, 0x000089ab, 0x0000cdef );
|
|
|
|
///////////////////////////////////////////////////////////////////////////
|
|
|
|
private Int32 OpenJob_1_0( Int32 ConnectionHandle, Hashtable InParameters, ref Hashtable OutParameters )
|
|
{
|
|
StartTiming( "OpenJob_1_0-Internal", true );
|
|
|
|
Int32 ErrorCode = Constants.INVALID;
|
|
|
|
Connection JobOwner;
|
|
if( Connections.TryGetValue( ConnectionHandle, out JobOwner ) )
|
|
{
|
|
Debug.Assert( ( JobOwner.Job == null ) ||
|
|
( JobOwner.Job.CurrentState == AgentJob.JobState.AGENT_JOB_CLOSED ) );
|
|
|
|
// Unpack the input parameters
|
|
AgentGuid JobGuid = InParameters["JobGuid"] as AgentGuid;
|
|
|
|
// Check for any other Jobs running
|
|
if( ActiveJobs.Count > 0 )
|
|
{
|
|
foreach( AgentJob Job in ActiveJobs.Values )
|
|
{
|
|
// If any remote jobs are running, we'll kill them
|
|
if( Job.Owner is RemoteConnection )
|
|
{
|
|
// We need to close this connection, which should quit the job and release all associated resources
|
|
Log( EVerbosityLevel.Verbose, ELogColour.Green, "[OpenJob] Interrupting remote job in favor of local one" );
|
|
Hashtable OwnerInParameters = null;
|
|
Hashtable OwnerOutParameters = null;
|
|
CloseConnection( Job.Owner.Handle, OwnerInParameters, ref OwnerOutParameters );
|
|
}
|
|
else
|
|
{
|
|
Debug.Assert( Job.Owner is LocalConnection );
|
|
|
|
// TODO: By allowing multiple local jobs to run, we expose
|
|
// limitations in the visualizer that cause it to do some odd
|
|
// things like assuming that all messages are referring to the
|
|
// same job (and thus easily confused)
|
|
}
|
|
}
|
|
}
|
|
|
|
// Ensure that the folders we need are there
|
|
string JobsFolder = Path.Combine( AgentApplication.Options.CacheFolder, "Jobs" );
|
|
ErrorCode = EnsureFolderExists( JobsFolder );
|
|
if( ErrorCode >= 0 )
|
|
{
|
|
// For the Job-specific folder, if it's already there, clean it out;
|
|
// we need to be sure to start every Job with an empty Job folder
|
|
string JobSpecificFolder = Path.Combine( JobsFolder, "Job-" + JobGuid.ToString() );
|
|
ErrorCode = DirectoryRecreate( JobSpecificFolder );
|
|
if( ErrorCode >= 0 )
|
|
{
|
|
// Create the new Job object and initialize it via the OpenJob method
|
|
AgentJob NewJob = new AgentJob( this );
|
|
NewJob.OpenJob( JobOwner, JobGuid );
|
|
|
|
Log( EVerbosityLevel.Informative, ELogColour.Green, "[Job] Accepted Job " + JobGuid.ToString() );
|
|
ErrorCode = Constants.SUCCESS;
|
|
}
|
|
}
|
|
|
|
// Check for errors from above
|
|
if( ErrorCode < 0 )
|
|
{
|
|
Log( EVerbosityLevel.Critical, ELogColour.Red, "[Job] OpenJob: Rejected, unable to create necessary directories" );
|
|
ErrorCode = Constants.ERROR_JOB;
|
|
}
|
|
}
|
|
else
|
|
{
|
|
Log( EVerbosityLevel.Critical, ELogColour.Red, "[Job] OpenJob: Rejected, unrecognized connection" );
|
|
ErrorCode = Constants.ERROR_CONNECTION_NOT_FOUND;
|
|
}
|
|
|
|
StopTiming();
|
|
return ErrorCode;
|
|
}
|
|
|
|
private Int32 BeginJobSpecification_1_0( Int32 ConnectionHandle, Hashtable InParameters, ref Hashtable OutParameters )
|
|
{
|
|
StartTiming( "BeginJobSpecification_1_0-Internal", true );
|
|
|
|
Int32 ErrorCode = Constants.INVALID;
|
|
|
|
Connection JobOwner;
|
|
if( ( Connections.TryGetValue( ConnectionHandle, out JobOwner ) &&
|
|
( JobOwner.Job != null ) ) )
|
|
{
|
|
// Unpack the input parameters
|
|
AgentJobSpecification Specification32 = InParameters["Specification32"] as AgentJobSpecification;
|
|
AgentJobSpecification Specification64 = InParameters["Specification64"] as AgentJobSpecification;
|
|
|
|
Hashtable Description32 = null;
|
|
if( InParameters.Contains( "Description32" ) )
|
|
{
|
|
Description32 = InParameters["Description32"] as Hashtable;
|
|
}
|
|
Hashtable Description64 = null;
|
|
if( InParameters.Contains( "Description64" ) )
|
|
{
|
|
Description64 = InParameters["Description64"] as Hashtable;
|
|
}
|
|
|
|
ErrorCode = JobOwner.Job.BeginJobSpecification( Specification32, Description32, Specification64, Description64 );
|
|
}
|
|
else
|
|
{
|
|
Log( EVerbosityLevel.Critical, ELogColour.Red, "[Job] BeginJobSpecification: Rejected, unrecognized connection or job" );
|
|
ErrorCode = Constants.ERROR_CONNECTION_NOT_FOUND;
|
|
}
|
|
|
|
StopTiming();
|
|
return ErrorCode;
|
|
}
|
|
|
|
private Int32 AddTask_1_0( Int32 ConnectionHandle, Hashtable InParameters, ref Hashtable OutParameters )
|
|
{
|
|
StartTiming( "AddTask_1_0-Internal", true );
|
|
|
|
Int32 ErrorCode = Constants.INVALID;
|
|
|
|
Connection JobOwner;
|
|
if( ( Connections.TryGetValue( ConnectionHandle, out JobOwner ) &&
|
|
( JobOwner.Job != null ) ) )
|
|
{
|
|
// This is only allowed while in the PENDING state, unless the one adding the
|
|
// Task is a remote Agent (in contrast to a local connection, i.e. an Instigator)
|
|
if( ( JobOwner.Job.CurrentState == AgentJob.JobState.AGENT_JOB_PENDING ) ||
|
|
( JobOwner is RemoteConnection ) )
|
|
{
|
|
// Unpack the input parameters and pass them along
|
|
if( InParameters.ContainsKey( "Specification" ) )
|
|
{
|
|
AgentTaskSpecification Specification = InParameters["Specification"] as AgentTaskSpecification;
|
|
ErrorCode = JobOwner.Job.AddTask( Specification );
|
|
}
|
|
else if( InParameters.ContainsKey( "Specifications" ) )
|
|
{
|
|
List<AgentTaskSpecification> Specifications = InParameters["Specifications"] as List<AgentTaskSpecification>;
|
|
foreach( AgentTaskSpecification NextSpecification in Specifications )
|
|
{
|
|
ErrorCode = JobOwner.Job.AddTask( NextSpecification );
|
|
if( ErrorCode < 0 )
|
|
{
|
|
// If any of the individual adds has an error, stop and return
|
|
break;
|
|
}
|
|
}
|
|
}
|
|
}
|
|
else
|
|
{
|
|
Log( EVerbosityLevel.Critical, ELogColour.Red, "[Job] AddTask: Rejected, cannot add a task to a running job" );
|
|
ErrorCode = Constants.ERROR_JOB;
|
|
}
|
|
}
|
|
else
|
|
{
|
|
Log( EVerbosityLevel.Critical, ELogColour.Red, "[Job] AddTask: Rejected, unrecognized connection or job" );
|
|
ErrorCode = Constants.ERROR_CONNECTION_NOT_FOUND;
|
|
}
|
|
|
|
StopTiming();
|
|
return ErrorCode;
|
|
}
|
|
|
|
private Int32 EndJobSpecification_1_0( Int32 ConnectionHandle, Hashtable InParameters, ref Hashtable OutParameters )
|
|
{
|
|
StartTiming( "EndJobSpecification_1_0-Internal", true );
|
|
|
|
Int32 ErrorCode = Constants.INVALID;
|
|
|
|
Connection JobOwner;
|
|
if( ( Connections.TryGetValue( ConnectionHandle, out JobOwner ) &&
|
|
( JobOwner.Job != null ) ) )
|
|
{
|
|
ErrorCode = JobOwner.Job.EndJobSpecification();
|
|
}
|
|
else
|
|
{
|
|
Log( EVerbosityLevel.Critical, ELogColour.Red, "[Job] EndJobSpecification: Rejected, unrecognized connection or job" );
|
|
ErrorCode = Constants.ERROR_CONNECTION_NOT_FOUND;
|
|
}
|
|
|
|
StopTiming();
|
|
return ErrorCode;
|
|
}
|
|
|
|
private Int32 CloseJob_1_0( Int32 ConnectionHandle, Hashtable InParameters, ref Hashtable OutParameters )
|
|
{
|
|
StartTiming( "CloseJob_1_0-Internal", true );
|
|
|
|
Int32 ErrorCode = Constants.INVALID;
|
|
|
|
Connection JobOwner;
|
|
if( ( Connections.TryGetValue( ConnectionHandle, out JobOwner ) &&
|
|
( JobOwner.Job != null ) ) )
|
|
{
|
|
ErrorCode = JobOwner.Job.CloseJob();
|
|
}
|
|
else
|
|
{
|
|
Log( EVerbosityLevel.Critical, ELogColour.Red, "[Job] CloseJob: Rejected, unrecognized connection or job" );
|
|
ErrorCode = Constants.ERROR_CONNECTION_NOT_FOUND;
|
|
}
|
|
|
|
StopTiming();
|
|
return ErrorCode;
|
|
}
|
|
|
|
public Int32 GetActiveJobGuid( Int32 ConnectionHandle, ref AgentGuid ActiveJobGuid )
|
|
{
|
|
StartTiming( "GetActiveJobGuid-Internal", true );
|
|
|
|
ActiveJobGuid = null;
|
|
Int32 ErrorCode = Constants.INVALID;
|
|
|
|
Connection JobOwner;
|
|
if( Connections.TryGetValue( ConnectionHandle, out JobOwner ) )
|
|
{
|
|
if( JobOwner.Job != null )
|
|
{
|
|
ActiveJobGuid = JobOwner.Job.JobGuid;
|
|
ErrorCode = Constants.SUCCESS;
|
|
}
|
|
}
|
|
else
|
|
{
|
|
Log( EVerbosityLevel.Critical, ELogColour.Red, "[Job] GetActiveJobGuid: Rejected, unrecognized connection" );
|
|
ErrorCode = Constants.ERROR_CONNECTION_NOT_FOUND;
|
|
}
|
|
|
|
StopTiming();
|
|
return ErrorCode;
|
|
}
|
|
|
|
public void MaintainJobs()
|
|
{
|
|
// If we're not initialied yet, do nothing
|
|
if( !Initialized.WaitOne(0) )
|
|
{
|
|
return;
|
|
}
|
|
|
|
// For all running jobs, check for completion, crashes, etc
|
|
List<AgentGuid> InactiveJobs = new List<AgentGuid>();
|
|
foreach( AgentJob Job in ActiveJobs.Values )
|
|
{
|
|
// We need the state to not change while we're in here
|
|
lock( Job.CurrentStateLock )
|
|
{
|
|
// We also need the process object, if it still is around, to stay around while we do this
|
|
lock( Job.ProcessObjectLock )
|
|
{
|
|
// Only maintain running Jobs
|
|
if( Job.CurrentState == AgentJob.JobState.AGENT_JOB_RUNNING )
|
|
{
|
|
bool IsAnAgentManagedProcess =
|
|
( Job.Specification.JobFlags & EJobTaskFlags.FLAG_MANUAL_START ) == 0;
|
|
|
|
// Check for fully manual, not yet attached job processes
|
|
if( Job.ProcessObject == null )
|
|
{
|
|
String WaitingJobName = Job.Specification.ExecutableName + " " + Job.Specification.Parameters;
|
|
if( IsAnAgentManagedProcess == false )
|
|
{
|
|
Log( EVerbosityLevel.Informative, ELogColour.Orange, "[Maintain Jobs] Job \"" + WaitingJobName + "\" is not connected yet" );
|
|
}
|
|
else
|
|
{
|
|
Log( EVerbosityLevel.ExtraVerbose, ELogColour.Orange, "[Maintain Jobs] Job \"" + WaitingJobName + "\" is either trying to start up or shut down" );
|
|
}
|
|
}
|
|
else
|
|
{
|
|
string JobName = Job.Specification.ExecutableName;
|
|
TimeSpan JobRunningTime = DateTime.UtcNow - Job.StartTime;
|
|
Log( EVerbosityLevel.SuperVerbose, ELogColour.Green, "[Maintain Jobs] Job \"" + JobName + "\" has been running for " + JobRunningTime.TotalSeconds.ToString() + " seconds" );
|
|
|
|
// Update the job process stats
|
|
try
|
|
{
|
|
Job.ProcessObjectPeakVirtualMemorySize64 = Job.ProcessObject.PeakVirtualMemorySize64;
|
|
Job.ProcessObjectPeakWorkingSet64 = Job.ProcessObject.PeakWorkingSet64;
|
|
}
|
|
catch( Exception )
|
|
{
|
|
// Might throw an exception if the process exits while we're updating, it's okay
|
|
}
|
|
|
|
// If we're trying to avoid local execution, but the job is running and the
|
|
// Instigator has no remote children to help but does have outstanding
|
|
// reservations (presumably from local connections), ask the job check
|
|
// the reservations to see if they should be rescheduled locally
|
|
if( ( AgentApplication.Options.AvoidLocalExecution ) &&
|
|
( Job.OwnerIsInstigator ) &&
|
|
( Job.TaskReservationCount > 0 ) &&
|
|
( Job.Owner.RemoteChildren.Count == 0 ) )
|
|
{
|
|
Job.CheckForReservations();
|
|
}
|
|
}
|
|
}
|
|
else if( Job.CurrentState == AgentJob.JobState.AGENT_JOB_CLOSED )
|
|
{
|
|
// Job has been marked closed, check for whether it quit cleanly on its own
|
|
// as long as it's not the special debug job
|
|
if( ( Job.ProcessObject != null ) &&
|
|
( Job.ProcessObject.HasExited == false ) &&
|
|
( Job.JobGuid.Equals( DebuggingJobGuid ) == false ) )
|
|
{
|
|
// Determine if the user-specified timeout has expired
|
|
bool WaitedLongEnough = false;
|
|
if( AgentApplication.DeveloperOptions.JobExecutableTimeout >= 0 )
|
|
{
|
|
// Wait for the user-specified time
|
|
TimeSpan TimeSinceJobStopped = DateTime.UtcNow - Job.StopTime;
|
|
TimeSpan TimeToWait = TimeSpan.FromSeconds(AgentApplication.DeveloperOptions.JobExecutableTimeout);
|
|
|
|
if( TimeSinceJobStopped > TimeToWait )
|
|
{
|
|
string NewLogMessage = String.Format( "[Maintain Jobs] Waited for rogue Job for {0} seconds: \"{1}\"",
|
|
AgentApplication.DeveloperOptions.JobExecutableTimeout,
|
|
Job.JobGuid );
|
|
Log( EVerbosityLevel.Informative, ELogColour.Orange, NewLogMessage );
|
|
|
|
WaitedLongEnough = true;
|
|
}
|
|
else
|
|
{
|
|
Log( EVerbosityLevel.ExtraVerbose, ELogColour.Orange, "[Maintain Jobs] Waiting for Job to quit before killing: \"" + Job.JobGuid + "\"" );
|
|
}
|
|
}
|
|
|
|
// If we've waited long enough or if the owner's parent is no longer
|
|
// active (and thus unable to receive any of the results), kill it
|
|
if( ( Job.Owner.CurrentState == ConnectionState.DISCONNECTED ) ||
|
|
( WaitedLongEnough ) )
|
|
{
|
|
string NewLogMessage = String.Format( "[Maintain Jobs] Killing rogue Job \"{0}\"", Job.JobGuid );
|
|
Log( EVerbosityLevel.Informative, ELogColour.Orange, NewLogMessage );
|
|
|
|
Job.ProcessObject.Kill();
|
|
|
|
// After killing the process, add the Job to the inactive list
|
|
InactiveJobs.Add( Job.JobGuid );
|
|
}
|
|
}
|
|
else
|
|
{
|
|
// Place it on the inactive list
|
|
Log( EVerbosityLevel.Verbose, ELogColour.Green, "[Maintain Jobs] Job is closed and done, removing from list: \"" + Job.JobGuid + "\"" );
|
|
InactiveJobs.Add( Job.JobGuid );
|
|
}
|
|
}
|
|
else
|
|
{
|
|
Log( EVerbosityLevel.ExtraVerbose, ELogColour.Orange, "[Maintain Jobs] Job is " + Job.CurrentState.ToString() );
|
|
}
|
|
}
|
|
}
|
|
}
|
|
// Remove all inactive Jobs
|
|
foreach( AgentGuid NextGuid in InactiveJobs )
|
|
{
|
|
ActiveJobs.Remove( NextGuid );
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|