Files
2025-05-18 13:04:45 +08:00

1707 lines
41 KiB
C++

// Copyright Epic Games, Inc. All Rights Reserved.
#pragma once
namespace UE::IoStore::HTTP
{
// {{{1 event-loop-int .........................................................
////////////////////////////////////////////////////////////////////////////////
static FOutcome DoSend(FActivity* Activity, FHttpPeer& Peer)
{
#if IAS_HTTP_WITH_PERF
Activity->Stopwatch.SendStart();
#endif
TRACE_CPUPROFILER_EVENT_SCOPE(IasHttp::DoSend);
FBuffer& Buffer = Activity->Buffer;
const char* SendData = Buffer.GetData();
int32 SendSize = Buffer.GetSize();
uint32 AlreadySent = Activity->StateParam;
SendData += AlreadySent;
SendSize -= AlreadySent;
check(SendSize > 0);
FOutcome Outcome = Peer.Send(SendData, SendSize);
if (Outcome.IsError())
{
Activity_SetError(Activity, Outcome);
return Outcome;
}
if (Outcome.IsWaiting())
{
return Outcome;
}
check(Outcome.IsOk());
int32 Result = Outcome.GetResult();
Activity->StateParam += Result;
if (Activity->StateParam < Buffer.GetSize())
{
return DoSend(Activity, Peer);
}
#if IAS_HTTP_WITH_PERF
Activity->Stopwatch.SendEnd();
#endif
Activity_ChangeState(Activity, FActivity::EState::RecvMessage, Buffer.GetSize());
return FOutcome::Ok(Result);
}
////////////////////////////////////////////////////////////////////////////////
static FOutcome DoRecvPeer(FActivity* Activity, FHttpPeer& Peer, int32& MaxRecvSize, int32 Size)
{
Size = FMath::Min(Size, MaxRecvSize);
check(Size >= 0);
if (Size == 0)
{
return FOutcome::Waiting();
}
FMutableMemoryView DestView = Activity->Dest->GetMutableView();
char* Cursor = (char*)(DestView.GetData()) + Activity->StateParam;
check(Size + Activity->StateParam <= DestView.GetSize());
FOutcome Outcome = Peer.Recv(Cursor, Size);
if (Outcome.IsWaiting())
{
return Outcome;
}
if (Outcome.IsError())
{
Activity_SetError(Activity, Outcome);
return Outcome;
}
check(Outcome.IsOk());
int32 Result = Outcome.GetResult();
Activity->StateParam += Result;
MaxRecvSize = FMath::Max(MaxRecvSize - Result, 0);
return Outcome;
}
////////////////////////////////////////////////////////////////////////////////
static FOutcome DoRecvMessage(FActivity* Activity, FHttpPeer& Peer)
{
TRACE_CPUPROFILER_EVENT_SCOPE(IasHttp::DoRecvMessage);
static const uint32 PageSize = 256;
FBuffer& Buffer = Activity->Buffer;
const char* MessageRight;
while (true)
{
Trace(Activity, ETrace::StateChange, uint32(Activity->State));
#if IAS_HTTP_WITH_PERF
Activity->Stopwatch.RecvStart();
#endif
auto [Dest, DestSize] = Buffer.GetMutableFree(0, PageSize);
FOutcome Outcome = Peer.Recv(Dest, DestSize);
if (Outcome.IsError())
{
Activity_SetError(Activity, Outcome);
return Outcome;
}
if (Outcome.IsWaiting())
{
return Outcome;
}
check(Outcome.IsOk());
int32 Result = Outcome.GetResult();
Buffer.AdvanceUsed(Result);
// Rewind a little to cover cases where the terminal is fragmented across
// recv() calls
uint32 DestBias = 0;
if (Dest - 3 >= Buffer.GetData() + Activity->StateParam)
{
Dest -= (DestBias = 3);
}
int32 MessageEnd = FindMessageTerminal(Dest, Result + DestBias);
if (MessageEnd < 0)
{
if (Buffer.GetSize() > (8 << 10))
{
Activity_SetError(Activity, "Headers have grown larger than expected");
return FOutcome::Error(Activity->ErrorReason);
}
continue;
}
MessageRight = Dest + MessageEnd;
break;
}
// Fill out the internal response object
FResponseInternal& Internal = Activity->Response;
const char* MessageData = Buffer.GetData() + Activity->StateParam;
Internal.MessageLength = uint16(ptrdiff_t(MessageRight - MessageData));
FAnsiStringView ResponseView(MessageData, Internal.MessageLength);
if (ParseMessage(ResponseView, Internal.Offsets) < 0)
{
Activity_SetError(Activity, "Failed to parse message status");
return FOutcome::Error(Activity->ErrorReason);
}
// Parse headers
FAnsiStringView Headers = ResponseView.Mid(
Internal.Offsets.Headers,
Internal.MessageLength - Internal.Offsets.Headers - 2 // "-2" trims off '\r\n' that signals end of headers
);
int32 Count = 3;
bool bChunked = false;
bool IsKeepAlive = true;
int32 ContentLength = Activity->NoContent ? 0 : -1;
EnumerateHeaders(
Headers,
[&ContentLength, &bChunked, &IsKeepAlive, &Count] (FAnsiStringView Name, FAnsiStringView Value)
{
// todo; may need smarter value handling; ;/, separated options & key-value pairs (ex. in rfc2068)
if (Name.Equals("Content-Length", ESearchCase::IgnoreCase))
{
ContentLength = int32(CrudeToInt(Value));
Count--;
}
else if (Name.Equals("Transfer-Encoding", ESearchCase::IgnoreCase))
{
bChunked = Value.Equals("chunked", ESearchCase::IgnoreCase);
Count--;
}
else if (Name.Equals("Connection", ESearchCase::IgnoreCase))
{
IsKeepAlive = !Value.Equals("close");
Count--;
}
return Count > 0;
}
);
Activity->IsKeepAlive &= IsKeepAlive;
// Validate that the server's told us how and how much it will transmit
if (bChunked)
{
if (Activity->bAllowChunked == 0)
{
Activity_SetError(Activity, "Chunked transfer encoding disabled (ERRNOCHUNK)");
return FOutcome::Error(Activity->ErrorReason);
}
ContentLength = -1;
}
else if (ContentLength < 0)
{
Activity_SetError(Activity, "Missing/invalid Content-Length header");
return FOutcome::Error(Activity->ErrorReason);
}
// Call out to the sink to get a content destination
FIoBuffer* PriorDest = Activity->Dest; // to retain unioned Host ptr (redirect uses it in sink)
Internal.Code = -1;
Internal.ContentLength = ContentLength;
Activity_CallSink(Activity);
Activity->NoContent |= (ContentLength == 0);
// Check the user gave us a destination for content
FIoBuffer& Dest = *(Activity->Dest);
if (Activity->NoContent == 0)
{
if (&Dest == PriorDest)
{
Activity_SetError(Activity, "User did not provide a destination buffer");
return FOutcome::Error(Activity->ErrorReason);
}
// The user seems to have forgotten something. Let's help them along
if (int32 DestSize = int32(Dest.GetSize()); DestSize == 0)
{
static const uint32 DefaultChunkSize = 4 << 10;
uint32 Size = bChunked ? DefaultChunkSize : ContentLength;
Dest = FIoBuffer(Size);
}
else if (!bChunked && DestSize < ContentLength)
{
// todo: support piece-wise transfer of content (a la chunked).
Activity_SetError(Activity, "Destination buffer too small");
return FOutcome::Error(Activity->ErrorReason);
}
else if (enum { MinStreamBuf = 256 }; bChunked && DestSize < MinStreamBuf)
{
Dest = FIoBuffer(MinStreamBuf);
}
}
// Perhaps we have some of the content already?
const char* BufferRight = Buffer.GetData() + Buffer.GetSize();
uint32 AlreadyReceived = uint32(ptrdiff_t(BufferRight - MessageRight));
if (AlreadyReceived > uint32(ContentLength))
{
Activity_SetError(Activity, "More data received that expected");
return FOutcome::Error(Activity->ErrorReason);
}
// HEAD methods
if (Activity->NoContent == 1)
{
if (AlreadyReceived)
{
Activity_SetError(Activity, "Received content when none was expected");
return FOutcome::Error(Activity->ErrorReason);
}
Activity_ChangeState(Activity, FActivity::EState::RecvDone);
return FOutcome::Ok();
}
// We're all set to go and get content
check(Activity->Dest != nullptr);
auto NextState = bChunked ? FActivity::EState::RecvStream : FActivity::EState::RecvContent;
Activity_ChangeState(Activity, NextState, AlreadyReceived);
// Copy any of the content we may have already received.
if (AlreadyReceived == 0)
{
return FOutcome::Ok();
}
// This ordinarily doesn't happen due to the way higher levels pipeline
// requests. It can however occur with chunked transfers.
if (AlreadyReceived > Dest.GetSize())
{
Dest = FIoBuffer(AlreadyReceived);
}
FMutableMemoryView DestView = Dest.GetMutableView();
const char* Cursor = BufferRight - AlreadyReceived;
::memcpy(DestView.GetData(), Cursor, AlreadyReceived);
return FOutcome::Ok();
}
////////////////////////////////////////////////////////////////////////////////
static FOutcome DoRecvContent(FActivity* Activity, FHttpPeer& Peer, int32& MaxRecvSize)
{
TRACE_CPUPROFILER_EVENT_SCOPE(IasHttp::DoRecvContent);
while (true)
{
const FResponseInternal& Response = Activity->Response;
int32 Size = (Response.ContentLength - Activity->StateParam);
if (Size == 0)
{
break;
}
FOutcome Outcome = DoRecvPeer(Activity, Peer, MaxRecvSize, Size);
if (!Outcome.IsOk())
{
return Outcome;
}
}
#if IAS_HTTP_WITH_PERF
Activity->Stopwatch.RecvEnd();
#endif
Activity_ChangeState(Activity, FActivity::EState::RecvDone);
return FOutcome::Ok();
}
////////////////////////////////////////////////////////////////////////////////
static FOutcome DoRecvStream(FActivity* Activity, FHttpPeer& Peer, int32& MaxRecvSize)
{
auto RaiseCrLfError = [Activity] ()
{
if (Activity->NoContent)
{
Activity_SetError(Activity, "Trailing headers are not supported (ERRTRAIL)");
return FOutcome::Error(Activity->ErrorReason);
}
Activity_SetError(Activity, "Expected CRLF chunk terminal");
return FOutcome::Error(Activity->ErrorReason);
};
auto SinkData = [Activity] (FMemoryView View)
{
if (View.GetSize() == 0)
{
return;
}
// Temporarily clamp IoBuffer so if the sink does GetView/GetSize() it
// represents actual content and not the underlying working buffer.
FIoBuffer& Dest = *(Activity->Dest);
FIoBuffer Slice(View, Dest);
Swap(Dest, Slice);
Activity_CallSink(Activity);
Swap(Dest, Slice);
};
auto Done = [Activity] ()
{
#if IAS_HTTP_WITH_PERF
Activity->Stopwatch.RecvEnd();
#endif
*(Activity->Dest) = FIoBuffer();
Activity_ChangeState(Activity, FActivity::EState::RecvDone);
return FOutcome::Ok();
};
enum { CrLfLength = 2 };
int32 Size = int32(Activity->StateParam);
// Trailing chunk data.
while (Size < 0)
{
Activity->StateParam = 0;
Size = -Size;
int32 RefillSize = FMath::Min<int32>(Size, int32(Activity->Dest->GetSize()));
FOutcome Outcome = DoRecvPeer(Activity, Peer, MaxRecvSize, RefillSize);
if (!Outcome.IsOk())
{
Activity->StateParam = 0 - Size;
return Outcome;
}
int32 Result = Outcome.GetResult();
check(Result > 0);
FMemoryView View = Activity->Dest->GetView();
if (Size > CrLfLength)
{
int32 SinkSize = Size - CrLfLength;
SinkSize = FMath::Min(Result, SinkSize);
SinkData(View.Left(SinkSize));
View = View.Mid(SinkSize);
Size -= SinkSize;
Result -= SinkSize;
}
const char* Cursor = (char*)View.GetData();
int32 CrLfError = 0;
if (int32 n = CrLfLength; Size == n && Result >= n)
{
CrLfError |= (Cursor[0] != '\r');
--Size; --Result;
++Cursor;
}
if (int32 n = CrLfLength - 1; Size == n && Result >= n)
{
CrLfError |= (Cursor[0] != '\n');
--Size; --Result;
}
if (CrLfError)
{
return RaiseCrLfError();
}
Size = Result - Size;
Activity->StateParam = Size;
check(Size <= 0);
// Have we found the trailer-section that follows last-chunk?
if (Size == 0 && Activity->NoContent)
{
return Done();
}
}
// Peel off chunks
for (FMemoryView View = Activity->Dest->GetView(); Size > 0;)
{
const char* Cursor = (char*)(View.GetData());
// Isolate chunk size
int32 ChunkSize = -1;
uint32 HeaderLength = 0;
for (; HeaderLength < uint32(Size - 1); ++HeaderLength)
{
// Detect CRLF.
if (Cursor[HeaderLength + 1] != '\n')
{
continue;
}
++HeaderLength;
if (Cursor[HeaderLength - 1] != '\r')
{
continue;
}
++HeaderLength;
ChunkSize = int32(CrudeToInt<16>(Cursor));
if (ChunkSize < 0)
{
Activity_SetError(Activity, "Unparsable chunk size");
return FOutcome::Error(Activity->ErrorReason);
}
break;
}
// Maybe we were not able to find a CRLF terminator and need more data
if (ChunkSize < 0)
{
FMutableMemoryView WriteView = Activity->Dest->GetMutableView();
std::memmove(WriteView.GetData(), Cursor, Size);
Activity->StateParam = Size;
break;
}
check(ChunkSize >= 0);
Size -= HeaderLength;
// Dispatch as much data as we can.
uint32 SinkSize = FMath::Min<uint32>(ChunkSize, uint32(Size));
SinkData(View.Mid(HeaderLength, SinkSize));
View = View.Mid(HeaderLength + SinkSize);
Activity->StateParam = (Size -= ChunkSize);
Activity->NoContent = (ChunkSize == 0);
// A CRLF follows a chunk's data
Cursor = (char*)View.GetData();
int32 CrLfError = 0;
CrLfError |= (Size >= (CrLfLength - 1)) && Cursor[0] != '\r';
CrLfError |= (Size >= (CrLfLength - 0)) && Cursor[1] != '\n';
if (CrLfError != 0)
{
return RaiseCrLfError();
}
// Can we do CRLF now?
if (Size >= CrLfLength)
{
// Have we found the trailer-section that follows last-chunk?
if (Activity->NoContent)
{
return Done();
}
Activity->StateParam = (Size -= CrLfLength);
View = View.Mid(CrLfLength);
continue;
}
Activity->StateParam -= CrLfLength;
check(int32(Activity->StateParam) < 0);
break;
}
// Refill
if (int32(Activity->StateParam) >= 0)
{
uint32 RefillSize = uint32(Activity->Dest->GetSize()) - Activity->StateParam;
FOutcome Outcome = DoRecvPeer(Activity, Peer, MaxRecvSize, RefillSize);
if (!Outcome.IsOk())
{
return Outcome;
}
}
return DoRecvStream(Activity, Peer, MaxRecvSize);
}
////////////////////////////////////////////////////////////////////////////////
static FOutcome DoRecv(FActivity* Activity, FHttpPeer& Peer, int32& MaxRecvSize)
{
using EState = FActivity::EState;
EState State = Activity->State;
check(State >= EState::RecvMessage && State < EState::RecvDone);
if (State == EState::RecvMessage) return DoRecvMessage(Activity, Peer);
if (State == EState::RecvContent) return DoRecvContent(Activity, Peer, MaxRecvSize);
if (State == EState::RecvStream) return DoRecvStream(Activity, Peer, MaxRecvSize); //-V547
check(false); // it is not expected that we'll get here
return FOutcome::Error("unreachable");
}
////////////////////////////////////////////////////////////////////////////////
static void DoRecvDone(FActivity* Activity)
{
// Notify the user we've received everything
Activity_CallSink(Activity);
Activity_ChangeState(Activity, FActivity::EState::Completed);
}
////////////////////////////////////////////////////////////////////////////////
static void DoCancel(FActivity* Activity)
{
if (Activity->State >= FActivity::EState::Completed)
{
return;
}
Activity_ChangeState(Activity, FActivity::EState::Cancelled);
Activity_CallSink(Activity);
}
////////////////////////////////////////////////////////////////////////////////
static void DoFail(FActivity* Activity)
{
check(Activity->State == FActivity::EState::Failed);
// Notify the user we've received everything
Activity_CallSink(Activity);
}
// {{{1 work-queue .............................................................
/*
* - Activities (requests send with a loop) are managed in singly-linked lists
* - Each activity has an associated host it is talking to.
* - Hosts are ephemeral, or represented externally via a FConnectionPool object
* - Loop has a group for each host, and each host-group has a bunch of socket-groups
* - Host-group has a list of work; pending activities waiting to start
* - Socket-groups own up to two activities; one sending, one receiving
* - As it recvs, a socket-group will, if possible, fetch more work from the host
*
* Loop:
* FHostGroup[HostPtr]:
* Work: Act0 -> Act1 -> Act2 -> Act3 -> ...
* FPeerGroup[0...HostMaxConnections]:
* Act.Send
* Act.Recv
*/
////////////////////////////////////////////////////////////////////////////////
struct FTickState
{
FActivity* DoneList;
uint64 Cancels;
int32& RecvAllowance;
int32 PollTimeoutMs;
int32 FailTimeoutMs;
uint32 NowMs;
class FWorkQueue* Work;
};
////////////////////////////////////////////////////////////////////////////////
class FWorkQueue
{
public:
FWorkQueue() = default;
~FWorkQueue();
bool HasWork() const { return List != nullptr; }
void AddActivity(FActivity* Activity);
FActivity* PopActivity();
void TickCancels(FTickState& State);
private:
FActivity* List = nullptr;
FActivity* ListTail = nullptr;
uint64 ActiveSlots = 0;
UE_NONCOPYABLE(FWorkQueue);
};
////////////////////////////////////////////////////////////////////////////////
FWorkQueue::~FWorkQueue()
{
check(List == nullptr);
check(ListTail == nullptr);
}
////////////////////////////////////////////////////////////////////////////////
void FWorkQueue::AddActivity(FActivity* Activity)
{
// We use a tail pointer here to maintain order that requests were made
check(Activity->Next == nullptr);
if (ListTail != nullptr)
{
ListTail->Next = Activity;
}
List = (List == nullptr) ? Activity : List;
ListTail = Activity;
ActiveSlots |= (1ull << Activity->Slot);
}
////////////////////////////////////////////////////////////////////////////////
FActivity* FWorkQueue::PopActivity()
{
if (List == nullptr)
{
return nullptr;
}
FActivity* Activity = List;
if ((List = List->Next) == nullptr)
{
ListTail = nullptr;
}
check(ActiveSlots & (1ull << Activity->Slot));
ActiveSlots ^= (1ull << Activity->Slot);
Activity->Next = nullptr;
return Activity;
}
////////////////////////////////////////////////////////////////////////////////
void FWorkQueue::TickCancels(FTickState& State)
{
if (State.Cancels == 0 || (State.Cancels & ActiveSlots) == 0)
{
return;
}
// We are going to rebuild the list of activities to maintain order as the
// activity list is singular.
check(List != nullptr);
FActivity* Activity = List;
List = ListTail = nullptr;
ActiveSlots = 0;
for (FActivity* Next; Activity != nullptr; Activity = Next)
{
Next = Activity->Next;
if (uint64 Slot = (1ull << Activity->Slot); (State.Cancels & Slot) == 0)
{
Activity->Next = nullptr;
AddActivity(Activity);
continue;
}
DoCancel(Activity);
Activity->Next = State.DoneList;
State.DoneList = Activity;
}
}
// {{{1 peer-group .............................................................
////////////////////////////////////////////////////////////////////////////////
class FPeerGroup
{
public:
FPeerGroup() = default;
~FPeerGroup();
void Unwait() { check(bWaiting); bWaiting = false; }
FWaiter GetWaiter() const;
bool Tick(FTickState& State);
void TickSend(FTickState& State, FHost& Host, FPoller& Poller);
void Fail(FTickState& State, const char* Reason);
private:
void Negotiate(FTickState& State);
void RecvInternal(FTickState& State);
void SendInternal(FTickState& State);
FActivity* Send = nullptr;
FActivity* Recv = nullptr;
FHttpPeer Peer;
uint32 LastUseMs = 0;
uint8 IsKeepAlive = 0;
bool bNegotiating = false;
bool bWaiting = false;
UE_NONCOPYABLE(FPeerGroup);
};
////////////////////////////////////////////////////////////////////////////////
FPeerGroup::~FPeerGroup()
{
check(Send == nullptr);
check(Recv == nullptr);
}
////////////////////////////////////////////////////////////////////////////////
FWaiter FPeerGroup::GetWaiter() const
{
if (!bWaiting)
{
return FWaiter();
}
FWaitable Waitable = Peer.GetWaitable();
FWaiter Waiter(MoveTemp(Waitable));
Waiter.WaitFor((Recv != nullptr) ? FWaiter::EWhat::Recv : FWaiter::EWhat::Send);
return Waiter;
}
////////////////////////////////////////////////////////////////////////////////
void FPeerGroup::Fail(FTickState& State, const char* Reason)
{
// Any send left at this point is unrecoverable
if (Send != nullptr)
{
Send->Next = Recv;
Recv = Send;
}
// Failure is quite terminal and we need to abort everything
for (FActivity* Activity = Recv; Activity != nullptr;)
{
if (Activity->State != FActivity::EState::Failed)
{
Activity_SetError(Activity, Reason);
}
DoFail(Activity);
FActivity* Next = Activity->Next;
Activity->Next = State.DoneList;
State.DoneList = Activity;
Activity = Next;
}
Peer = FHttpPeer();
Send = Recv = nullptr;
bWaiting = false;
IsKeepAlive = 0;
bNegotiating = false;
}
////////////////////////////////////////////////////////////////////////////////
void FPeerGroup::Negotiate(FTickState& State)
{
check(bNegotiating);
check(Send != nullptr);
check(Peer.IsValid());
FOutcome Outcome = Peer.Handshake();
if (Outcome.IsError())
{
Fail(State, Outcome.GetMessage().GetData());
return;
}
if (Outcome.IsWaiting())
{
bWaiting = true;
return;
}
bNegotiating = false;
return SendInternal(State);
}
////////////////////////////////////////////////////////////////////////////////
void FPeerGroup::RecvInternal(FTickState& State)
{
check(bNegotiating == false);
check(Recv != nullptr);
// Another helper lambda
auto IsReceiving = [] (const FActivity* Act)
{
using EState = FActivity::EState;
return (Act->State >= EState::RecvMessage) & (Act->State < EState::RecvDone);
};
FActivity* Activity = Recv;
check(IsReceiving(Activity));
FOutcome Outcome = DoRecv(Activity, Peer, State.RecvAllowance);
// Any sort of error here is unrecoverable
if (Outcome.IsError())
{
Fail(State, Outcome.GetMessage().GetData());
return;
}
IsKeepAlive &= Activity->IsKeepAlive;
LastUseMs = State.NowMs;
bWaiting |= Outcome.IsWaiting();
// If we've only a small amount left to receive we can start more work
if (IsKeepAlive & (Recv->Next == nullptr) & (Send == nullptr))
{
uint32 Remaining = Activity_RemainingKiB(Activity);
if (Remaining < uint32(GRecvWorkThresholdKiB))
{
if (FActivity* Next = State.Work->PopActivity(); Next != nullptr)
{
Trace(Activity, ETrace::StartWork);
check(Send == nullptr);
Send = Next;
SendInternal(State);
if (!Peer.IsValid())
{
return;
}
}
}
}
// If there was no data available this is far as receiving can go
if (Outcome.IsWaiting())
{
return;
}
// If we're still in a receiving state we will just try again otherwise it
// is finished and we will let DoneList recipient finish it off.
if (IsReceiving(Activity))
{
return;
}
DoRecvDone(Activity);
Recv = Activity->Next;
Activity->Next = State.DoneList;
State.DoneList = Activity;
// If the server wants to close the socket we need to rewind the send
if (IsKeepAlive != 0)
{
return;
}
if (Send != nullptr && Activity_Rewind(Send) < 0)
{
Fail(State, "Unable to rewind on keep-alive close");
return;
}
Peer = FHttpPeer();
}
////////////////////////////////////////////////////////////////////////////////
void FPeerGroup::SendInternal(FTickState& State)
{
check(bNegotiating == false);
check(IsKeepAlive == 1);
check(Send != nullptr);
FActivity* Activity = Send;
FOutcome Outcome = DoSend(Activity, Peer);
if (Outcome.IsWaiting())
{
bWaiting = true;
return;
}
if (Outcome.IsError())
{
Fail(State, Outcome.GetMessage().GetData());
return;
}
Send = nullptr;
// Pass along this send to be received
if (Recv == nullptr)
{
Recv = Activity;
return;
}
check(Recv->Next == nullptr);
Recv->Next = Activity;
}
////////////////////////////////////////////////////////////////////////////////
bool FPeerGroup::Tick(FTickState& State)
{
if (bNegotiating)
{
Negotiate(State);
}
else if (Send != nullptr)
{
SendInternal(State);
}
if (Recv != nullptr && State.RecvAllowance)
{
RecvInternal(State);
}
return !!IsKeepAlive | !!(UPTRINT(Send) | UPTRINT(Recv));
}
////////////////////////////////////////////////////////////////////////////////
void FPeerGroup::TickSend(FTickState& State, FHost& Host, FPoller& Poller)
{
// This path is only for those that are idle and have nothing to do
if (Send != nullptr || Recv != nullptr)
{
return;
}
// Failing will try and recover work which we don't want to happen yet
FActivity* Pending = State.Work->PopActivity();
check(Pending != nullptr);
// Close idle sockets
if (Peer.IsValid() && LastUseMs + GIdleMs < State.NowMs)
{
LastUseMs = State.NowMs;
Peer = FHttpPeer();
}
// We don't have a connected socket on first use, or if a keep-alive:close
// was received from the server. So we connect here.
bool bWillBlock = false;
if (!Peer.IsValid())
{
FOutcome Outcome = FOutcome::None();
FSocket Socket;
if (Socket.Create())
{
FWaitable Waitable = Socket.GetWaitable();
Poller.Register(Waitable);
Outcome = Host.Connect(Socket);
}
else
{
Outcome = FOutcome::Error("Failed to create socket");
}
if (Outcome.IsError())
{
// We failed to connect, let's bail.
Pending->Next = Recv;
Recv = Pending;
Fail(State, Outcome.GetMessage().GetData());
return;
}
IsKeepAlive = 1;
bNegotiating = true;
bWillBlock = Outcome.IsWaiting();
FCertRootsRef VerifyCert = Host.GetVerifyCert();
const char* HostName = Host.GetHostName().GetData();
Peer = FHttpPeer(MoveTemp(Socket), VerifyCert, HostName);
}
Send = Pending;
if (!bWillBlock)
{
if (bNegotiating)
{
return Negotiate(State);
}
return SendInternal(State);
}
// Non-blocking connect
bWaiting = true;
}
// {{{1 host-group .............................................................
////////////////////////////////////////////////////////////////////////////////
class FHostGroup
{
public:
FHostGroup(FHost& InHost);
bool IsBusy() const { return BusyCount != 0; }
const FHost& GetHost() const { return Host; }
void Tick(FTickState& State);
void AddActivity(FActivity* Activity);
private:
int32 Wait(const FTickState& State);
TArray<FPeerGroup> PeerGroups;
FWorkQueue Work;
FHost& Host;
FPoller Poller;
uint32 BusyCount = 0;
int32 WaitTimeAccum = 0;
UE_NONCOPYABLE(FHostGroup);
};
////////////////////////////////////////////////////////////////////////////////
FHostGroup::FHostGroup(FHost& InHost)
: Host(InHost)
{
uint32 Num = InHost.GetMaxConnections();
PeerGroups.SetNum(Num);
}
////////////////////////////////////////////////////////////////////////////////
int32 FHostGroup::Wait(const FTickState& State)
{
// Collect groups that are waiting on something
TArray<FWaiter, TFixedAllocator<64>> Waiters;
for (uint32 i = 0, n = PeerGroups.Num(); i < n; ++i)
{
FWaiter Waiter = PeerGroups[i].GetWaiter();
if (!Waiter.IsValid())
{
continue;
}
Waiter.SetIndex(i);
Waiters.Add(MoveTemp(Waiter));
}
if (Waiters.IsEmpty())
{
return 0;
}
Trace(ETrace::Wait);
ON_SCOPE_EXIT { Trace(ETrace::Unwait); };
// If the poll timeout is negative then treat that as a fatal timeout
check(State.FailTimeoutMs);
int32 PollTimeoutMs = State.PollTimeoutMs;
if (PollTimeoutMs < 0)
{
PollTimeoutMs = State.FailTimeoutMs;
}
// Actually do the wait
int32 Result = FWaiter::Wait(Waiters, Poller, PollTimeoutMs);
if (Result <= 0)
{
// If the user opts to not block then we don't accumulate wait time and
// leave it to them to manage time a fail timoue
WaitTimeAccum += PollTimeoutMs;
if (State.PollTimeoutMs < 0 || WaitTimeAccum >= State.FailTimeoutMs)
{
return MIN_int32;
}
return Result;
}
WaitTimeAccum = 0;
// For each waiter that's ready, find the associated group "unwait" them.
int32 Count = 0;
for (int32 i = 0, n = Waiters.Num(); i < n; ++i)
{
if (!Waiters[i].IsReady())
{
continue;
}
uint32 Index = Waiters[i].GetIndex();
check(Index < uint32(PeerGroups.Num()));
PeerGroups[Index].Unwait();
Waiters.RemoveAtSwap(i, EAllowShrinking::No);
--n, --i, ++Count;
}
check(Count == Result);
return Result;
}
////////////////////////////////////////////////////////////////////////////////
void FHostGroup::Tick(FTickState& State)
{
State.Work = &Work;
if (BusyCount = Work.HasWork(); BusyCount)
{
Work.TickCancels(State);
// Get available work out on idle sockets as soon as possible
for (FPeerGroup& Group : PeerGroups)
{
if (!Work.HasWork())
{
break;
}
Group.TickSend(State, Host, Poller);
}
}
// Wait on the groups that are
if (int32 Result = Wait(State); Result < 0)
{
const char* Reason = (Result == MIN_int32)
? "FailTimeout hit"
: "poll() returned an unexpected error";
for (FPeerGroup& Group : PeerGroups)
{
Group.Fail(State, Reason);
}
return;
}
// Tick everything, starting with groups that are maybe closest to finishing
for (FPeerGroup& Group : PeerGroups)
{
BusyCount += (Group.Tick(State) == true);
}
}
////////////////////////////////////////////////////////////////////////////////
void FHostGroup::AddActivity(FActivity* Activity)
{
Work.AddActivity(Activity);
}
// {{{1 event-loop .............................................................
////////////////////////////////////////////////////////////////////////////////
static const FEventLoop::FRequestParams GDefaultParams;
////////////////////////////////////////////////////////////////////////////////
class FEventLoop::FImpl
{
public:
~FImpl();
uint32 Tick(int32 PollTimeoutMs=0);
bool IsIdle() const;
void Throttle(uint32 KiBPerSec);
void SetFailTimeout(int32 TimeoutMs);
void Cancel(FTicket Ticket);
FRequest Request(FAnsiStringView Method, FAnsiStringView Path, FActivity* Activity);
FTicket Send(FActivity* Activity);
private:
void ReceiveWork();
FCriticalSection Lock;
std::atomic<uint64> FreeSlots = ~0ull;
std::atomic<uint64> Cancels = 0;
uint64 PrevFreeSlots = ~0ull;
FActivity* Pending = nullptr;
FThrottler Throttler;
TArray<FHostGroup> Groups;
int32 FailTimeoutMs = GIdleMs;
uint32 BusyCount = 0;
};
////////////////////////////////////////////////////////////////////////////////
FEventLoop::FImpl::~FImpl()
{
check(BusyCount == 0);
}
////////////////////////////////////////////////////////////////////////////////
FRequest FEventLoop::FImpl::Request(
FAnsiStringView Method,
FAnsiStringView Path,
FActivity* Activity)
{
Trace(Activity, ETrace::ActivityCreate, 0);
Activity_ChangeState(Activity, FActivity::EState::Build);
if (Path.Len() == 0)
{
Path = "/";
}
Activity->NoContent = (Method == "HEAD");
auto& Buffer = Activity->Buffer;
Buffer.Begin(Method, Path);
Buffer.AddHeader("Host", Activity->Host->GetHostName());
// HTTP/1.1 is persistent by default thus "Connection" header isn't required
if (!Activity->IsKeepAlive)
{
Buffer.AddHeader("Connection", "close");
}
FRequest Ret;
Ret.Ptr = Activity;
return Ret;
}
////////////////////////////////////////////////////////////////////////////////
FTicket FEventLoop::FImpl::Send(FActivity* Activity)
{
Trace(Activity, ETrace::RequestBegin);
Activity->Buffer.End();
Activity_ChangeState(Activity, FActivity::EState::Send);
uint64 Slot;
{
FScopeLock _(&Lock);
for (;; FPlatformProcess::SleepNoStats(0.0f))
{
uint64 FreeSlotsLoad = FreeSlots.load(std::memory_order_relaxed);
if (!FreeSlotsLoad)
{
// we don't handle oversubscription at the moment. Could return
// activity to Reqeust and return a 0 ticket.
check(false);
}
Slot = -int64(FreeSlotsLoad) & FreeSlotsLoad;
if (FreeSlots.compare_exchange_weak(FreeSlotsLoad, FreeSlotsLoad - Slot, std::memory_order_relaxed))
{
break;
}
}
Activity->Slot = int8(63 - FMath::CountLeadingZeros64(Slot));
// This puts pending requests in reverse order of when they were made
// but this will be undone when ReceiveWork() traverses the list
Activity->Next = Pending;
Pending = Activity;
}
return Slot;
}
////////////////////////////////////////////////////////////////////////////////
bool FEventLoop::FImpl::IsIdle() const
{
return FreeSlots.load(std::memory_order_relaxed) == ~0ull;
}
////////////////////////////////////////////////////////////////////////////////
void FEventLoop::FImpl::Throttle(uint32 KiBPerSec)
{
Throttler.SetLimit(KiBPerSec);
}
////////////////////////////////////////////////////////////////////////////////
void FEventLoop::FImpl::SetFailTimeout(int32 TimeoutMs)
{
if (TimeoutMs > 0)
{
FailTimeoutMs = TimeoutMs;
}
else
{
FailTimeoutMs = GIdleMs; // Reset to default
}
}
////////////////////////////////////////////////////////////////////////////////
void FEventLoop::FImpl::Cancel(FTicket Ticket)
{
Cancels.fetch_or(Ticket, std::memory_order_relaxed);
}
////////////////////////////////////////////////////////////////////////////////
void FEventLoop::FImpl::ReceiveWork()
{
uint64 FreeSlotsLoad = FreeSlots.load(std::memory_order_relaxed);
if (FreeSlots == PrevFreeSlots)
{
return;
}
PrevFreeSlots = FreeSlotsLoad;
// Fetch the pending activities from out in the wild
FActivity* Activity = nullptr;
{
FScopeLock _(&Lock);
Swap(Activity, Pending);
}
// Pending is in the reverse of the order that requests were made
FActivity* Reverse = nullptr;
for (FActivity* Next; Activity != nullptr; Activity = Next)
{
Next = Activity->Next;
Activity->Next = Reverse;
Reverse = Activity;
}
Activity = Reverse;
// Group activities by their host.
for (FActivity* Next; Activity != nullptr; Activity = Next)
{
Next = Activity->Next;
Activity->Next = nullptr;
FHost& Host = *(Activity->Host);
auto Pred = [&Host] (const FHostGroup& Lhs) { return &Lhs.GetHost() == &Host; };
FHostGroup* Group = Groups.FindByPredicate(Pred);
if (Group == nullptr)
{
Group = &(Groups.Emplace_GetRef(Host));
}
Group->AddActivity(Activity);
++BusyCount;
}
}
////////////////////////////////////////////////////////////////////////////////
uint32 FEventLoop::FImpl::Tick(int32 PollTimeoutMs)
{
TRACE_CPUPROFILER_EVENT_SCOPE(IasHttp::Tick);
ReceiveWork();
// We limit recv sizes as a way to control bandwidth use.
int32 RecvAllowance = Throttler.GetAllowance();
if (RecvAllowance <= 0)
{
if (PollTimeoutMs == 0)
{
return BusyCount;
}
int32 ThrottleWaitMs = -RecvAllowance;
if (PollTimeoutMs > 0)
{
ThrottleWaitMs = FMath::Min(ThrottleWaitMs, PollTimeoutMs);
}
FPlatformProcess::SleepNoStats(float(ThrottleWaitMs) / 1000.0f);
RecvAllowance = Throttler.GetAllowance();
if (RecvAllowance <= 0)
{
return BusyCount;
}
}
uint64 CancelsLoad = Cancels.load(std::memory_order_relaxed);
uint32 NowMs;
{
// 4.2MM seconds will give us 50 days of uptime.
static uint64 Freq = 0;
static uint64 Base = 0;
if (Freq == 0)
{
Freq = uint64(1.0 / FPlatformTime::GetSecondsPerCycle64());
Base = FPlatformTime::Cycles64();
}
uint64 NowBig = ((FPlatformTime::Cycles64() - Base) * 1000) / Freq;
NowMs = uint32(NowBig);
check(NowMs == NowBig);
}
// Tick groups and then remove ones that are idle
FTickState TickState = {
.DoneList = nullptr,
.Cancels = CancelsLoad,
.RecvAllowance = RecvAllowance,
.PollTimeoutMs = PollTimeoutMs,
.FailTimeoutMs = FailTimeoutMs,
.NowMs = NowMs,
};
for (FHostGroup& Group : Groups)
{
Group.Tick(TickState);
}
for (uint32 i = 0, n = Groups.Num(); i < n; ++i)
{
FHostGroup& Group = Groups[i];
if (Group.IsBusy())
{
continue;
}
Groups.RemoveAtSwap(i, EAllowShrinking::No);
--n, --i;
}
Throttler.ReturnUnused(RecvAllowance);
uint64 ReturnedSlots = 0;
for (FActivity* Activity = TickState.DoneList; Activity != nullptr;)
{
FActivity* Next = Activity->Next;
ReturnedSlots |= (1ull << Activity->Slot);
Activity_Free(Activity);
--BusyCount;
Activity = Next;
}
uint32 BusyBias = 0;
if (ReturnedSlots)
{
uint64 LatestFree = FreeSlots.fetch_add(ReturnedSlots, std::memory_order_relaxed);
BusyBias += (LatestFree != PrevFreeSlots);
PrevFreeSlots += ReturnedSlots;
}
if (CancelsLoad)
{
Cancels.fetch_and(~CancelsLoad, std::memory_order_relaxed);
}
return BusyCount + BusyBias;
}
////////////////////////////////////////////////////////////////////////////////
FEventLoop::FEventLoop() { Impl = new FEventLoop::FImpl(); Trace(Impl, ETrace::LoopCreate); }
FEventLoop::~FEventLoop() { Trace(Impl, ETrace::LoopDestroy); delete Impl; }
uint32 FEventLoop::Tick(int32 PollTimeoutMs) { return Impl->Tick(PollTimeoutMs); }
bool FEventLoop::IsIdle() const { return Impl->IsIdle(); }
void FEventLoop::Cancel(FTicket Ticket) { return Impl->Cancel(Ticket); }
void FEventLoop::Throttle(uint32 KiBPerSec) { return Impl->Throttle(KiBPerSec); }
void FEventLoop::SetFailTimeout(int32 Ms) { return Impl->SetFailTimeout(Ms); }
////////////////////////////////////////////////////////////////////////////////
FRequest FEventLoop::Request(
FAnsiStringView Method,
FAnsiStringView Url,
const FRequestParams* Params)
{
// Parse the URL into its components
FUrlOffsets UrlOffsets;
if (ParseUrl(Url, UrlOffsets) < 0)
{
return FRequest();
}
FAnsiStringView HostName = UrlOffsets.HostName.Get(Url);
uint32 Port = 0;
if (UrlOffsets.Port)
{
FAnsiStringView PortView = UrlOffsets.Port.Get(Url);
Port = uint32(CrudeToInt(PortView));
}
FAnsiStringView Path;
if (UrlOffsets.Path > 0)
{
Path = Url.Mid(UrlOffsets.Path);
}
// Create an activity and an emphemeral host
Params = (Params != nullptr) ? Params : &GDefaultParams;
FCertRootsRef VerifyCert = FCertRoots::NoTls();
if (UrlOffsets.SchemeLength == 5)
{
if (VerifyCert = Params->VerifyCert; VerifyCert == ECertRootsRefType::None)
{
VerifyCert = FCertRoots::Default();
}
check(VerifyCert != ECertRootsRefType::None);
}
uint32 BufferSize = Params->BufferSize;
BufferSize = (BufferSize >= 128) ? BufferSize : 128;
BufferSize += sizeof(FHost) + HostName.Len();
FActivity* Activity = Activity_Alloc(BufferSize);
FBuffer& Buffer = Activity->Buffer;
FHost* Host = Activity->Host = Buffer.Alloc<FHost>();
Activity->IsKeepAlive = 0;
Activity->bFollow30x = (Params->bAutoRedirect == true);
Activity->bAllowChunked = (Params->bAllowChunked == true);
uint32 HostNameLength = HostName.Len();
char* HostNamePtr = Buffer.Alloc<char>(HostNameLength + 1);
Buffer.Fix();
Activity_SetScore(Activity, Params->ContentSizeEst);
memcpy(HostNamePtr, HostName.GetData(), HostNameLength);
HostNamePtr[HostNameLength] = '\0';
new (Host) FHost({
.HostName = HostNamePtr,
.Port = Port,
.VerifyCert = VerifyCert,
});
return Impl->Request(Method, Path, Activity);
}
////////////////////////////////////////////////////////////////////////////////
FRequest FEventLoop::Request(
FAnsiStringView Method,
FAnsiStringView Path,
FConnectionPool& Pool,
const FRequestParams* Params)
{
check(Pool.Ptr != nullptr);
check(Params == nullptr || Params->VerifyCert == ECertRootsRefType::None); // add cert to FConPool instead
Params = (Params != nullptr) ? Params : &GDefaultParams;
uint32 BufferSize = Params->BufferSize;
BufferSize = (BufferSize >= 128) ? BufferSize : 128;
FActivity* Activity = Activity_Alloc(BufferSize);
Activity->Host = Pool.Ptr;
Activity->IsKeepAlive = 1;
Activity->bFollow30x = (Params->bAutoRedirect == true);
Activity->bAllowChunked = (Params->bAllowChunked == true);
Activity->LengthScore = 0;
Activity_SetScore(Activity, Params->ContentSizeEst);
return Impl->Request(Method, Path, Activity);
}
////////////////////////////////////////////////////////////////////////////////
bool FEventLoop::Redirect(const FTicketStatus& Status, FTicketSink& OuterSink)
{
const FResponse& Response = Status.GetResponse();
switch (Response.GetStatusCode())
{
case 301: // RedirectMoved
case 302: // RedirectFound
case 307: // RedirectTemp
case 308: break; // RedirectPerm
default: return false;
}
FAnsiStringView Location = Response.GetHeader("Location");
if (Location.IsEmpty())
{
// todo: turn source activity into an error?
return false;
}
check(Response.GetContentLength() == 0); // should we ever hit this, we'll fix it
const auto& Activity = (FActivity&)Response; // todo: yuk
// Original method should remain unchanged
FAnsiStringView Method = Activity.Buffer.GetMethod();
check(!Method.IsEmpty());
FRequest ForwardRequest;
if (!Location.StartsWith("http://") && !Location.StartsWith("https://"))
{
if (Location[0] != '/')
{
return false;
}
FHost& Host = *(Activity.Host);
TAnsiStringBuilder<256> Url;
Url << ((Host.GetVerifyCert() != ECertRootsRefType::None) ? "https" : "http");
Url << "://";
Url << Host.GetHostName();
Url << ":" << Host.GetPort();
Url << Location;
FRequestParams RequestParams = {
.VerifyCert = Host.GetVerifyCert(),
};
new (&ForwardRequest) FRequest(Request(Method, Url, &RequestParams));
}
else
{
new (&ForwardRequest) FRequest(Request(Method, Location));
}
// Transfer original request headers
check(Activity.State == FActivity::EState::RecvMessage);
Activity.Buffer.EnumerateHeaders([&ForwardRequest] (FAnsiStringView Name, FAnsiStringView Value)
{
if (Name != "Host" && Name != "Connection")
{
ForwardRequest.Header(Name, Value);
}
return true;
});
// Send the request
Send(MoveTemp(ForwardRequest), MoveTemp(OuterSink), Status.GetParam());
// todo: activity slots should be swapped so original slot matches ticket
return true;
}
////////////////////////////////////////////////////////////////////////////////
FTicket FEventLoop::Send(FRequest&& Request, FTicketSink Sink, UPTRINT SinkParam)
{
FActivity* Activity = nullptr;
Swap(Activity, Request.Ptr);
Activity->SinkParam = SinkParam;
Activity->Sink = MoveTemp(Sink);
// Intercept sink calls to catch 30x status codes and follow them
if (Activity->bFollow30x)
{
auto RedirectSink = [
this,
OuterSink=MoveTemp(Activity->Sink)
] (const FTicketStatus& Status) mutable
{
if (Status.GetId() == FTicketStatus::EId::Response)
{
if (Redirect(Status, OuterSink))
{
return;
}
}
if (OuterSink)
{
return OuterSink(Status);
}
};
Activity->Sink = RedirectSink;
}
return Impl->Send(Activity);
}
// }}}
} // namespace UE::IoStore::HTTP