// Copyright Epic Games, Inc. All Rights Reserved. #pragma once namespace UE::IoStore::HTTP { // {{{1 event-loop-int ......................................................... //////////////////////////////////////////////////////////////////////////////// static FOutcome DoSend(FActivity* Activity, FHttpPeer& Peer) { #if IAS_HTTP_WITH_PERF Activity->Stopwatch.SendStart(); #endif TRACE_CPUPROFILER_EVENT_SCOPE(IasHttp::DoSend); FBuffer& Buffer = Activity->Buffer; const char* SendData = Buffer.GetData(); int32 SendSize = Buffer.GetSize(); uint32 AlreadySent = Activity->StateParam; SendData += AlreadySent; SendSize -= AlreadySent; check(SendSize > 0); FOutcome Outcome = Peer.Send(SendData, SendSize); if (Outcome.IsError()) { Activity_SetError(Activity, Outcome); return Outcome; } if (Outcome.IsWaiting()) { return Outcome; } check(Outcome.IsOk()); int32 Result = Outcome.GetResult(); Activity->StateParam += Result; if (Activity->StateParam < Buffer.GetSize()) { return DoSend(Activity, Peer); } #if IAS_HTTP_WITH_PERF Activity->Stopwatch.SendEnd(); #endif Activity_ChangeState(Activity, FActivity::EState::RecvMessage, Buffer.GetSize()); return FOutcome::Ok(Result); } //////////////////////////////////////////////////////////////////////////////// static FOutcome DoRecvPeer(FActivity* Activity, FHttpPeer& Peer, int32& MaxRecvSize, int32 Size) { Size = FMath::Min(Size, MaxRecvSize); check(Size >= 0); if (Size == 0) { return FOutcome::Waiting(); } FMutableMemoryView DestView = Activity->Dest->GetMutableView(); char* Cursor = (char*)(DestView.GetData()) + Activity->StateParam; check(Size + Activity->StateParam <= DestView.GetSize()); FOutcome Outcome = Peer.Recv(Cursor, Size); if (Outcome.IsWaiting()) { return Outcome; } if (Outcome.IsError()) { Activity_SetError(Activity, Outcome); return Outcome; } check(Outcome.IsOk()); int32 Result = Outcome.GetResult(); Activity->StateParam += Result; MaxRecvSize = FMath::Max(MaxRecvSize - Result, 0); return Outcome; } //////////////////////////////////////////////////////////////////////////////// static FOutcome DoRecvMessage(FActivity* Activity, FHttpPeer& Peer) { TRACE_CPUPROFILER_EVENT_SCOPE(IasHttp::DoRecvMessage); static const uint32 PageSize = 256; FBuffer& Buffer = Activity->Buffer; const char* MessageRight; while (true) { Trace(Activity, ETrace::StateChange, uint32(Activity->State)); #if IAS_HTTP_WITH_PERF Activity->Stopwatch.RecvStart(); #endif auto [Dest, DestSize] = Buffer.GetMutableFree(0, PageSize); FOutcome Outcome = Peer.Recv(Dest, DestSize); if (Outcome.IsError()) { Activity_SetError(Activity, Outcome); return Outcome; } if (Outcome.IsWaiting()) { return Outcome; } check(Outcome.IsOk()); int32 Result = Outcome.GetResult(); Buffer.AdvanceUsed(Result); // Rewind a little to cover cases where the terminal is fragmented across // recv() calls uint32 DestBias = 0; if (Dest - 3 >= Buffer.GetData() + Activity->StateParam) { Dest -= (DestBias = 3); } int32 MessageEnd = FindMessageTerminal(Dest, Result + DestBias); if (MessageEnd < 0) { if (Buffer.GetSize() > (8 << 10)) { Activity_SetError(Activity, "Headers have grown larger than expected"); return FOutcome::Error(Activity->ErrorReason); } continue; } MessageRight = Dest + MessageEnd; break; } // Fill out the internal response object FResponseInternal& Internal = Activity->Response; const char* MessageData = Buffer.GetData() + Activity->StateParam; Internal.MessageLength = uint16(ptrdiff_t(MessageRight - MessageData)); FAnsiStringView ResponseView(MessageData, Internal.MessageLength); if (ParseMessage(ResponseView, Internal.Offsets) < 0) { Activity_SetError(Activity, "Failed to parse message status"); return FOutcome::Error(Activity->ErrorReason); } // Parse headers FAnsiStringView Headers = ResponseView.Mid( Internal.Offsets.Headers, Internal.MessageLength - Internal.Offsets.Headers - 2 // "-2" trims off '\r\n' that signals end of headers ); int32 Count = 3; bool bChunked = false; bool IsKeepAlive = true; int32 ContentLength = Activity->NoContent ? 0 : -1; EnumerateHeaders( Headers, [&ContentLength, &bChunked, &IsKeepAlive, &Count] (FAnsiStringView Name, FAnsiStringView Value) { // todo; may need smarter value handling; ;/, separated options & key-value pairs (ex. in rfc2068) if (Name.Equals("Content-Length", ESearchCase::IgnoreCase)) { ContentLength = int32(CrudeToInt(Value)); Count--; } else if (Name.Equals("Transfer-Encoding", ESearchCase::IgnoreCase)) { bChunked = Value.Equals("chunked", ESearchCase::IgnoreCase); Count--; } else if (Name.Equals("Connection", ESearchCase::IgnoreCase)) { IsKeepAlive = !Value.Equals("close"); Count--; } return Count > 0; } ); Activity->IsKeepAlive &= IsKeepAlive; // Validate that the server's told us how and how much it will transmit if (bChunked) { if (Activity->bAllowChunked == 0) { Activity_SetError(Activity, "Chunked transfer encoding disabled (ERRNOCHUNK)"); return FOutcome::Error(Activity->ErrorReason); } ContentLength = -1; } else if (ContentLength < 0) { Activity_SetError(Activity, "Missing/invalid Content-Length header"); return FOutcome::Error(Activity->ErrorReason); } // Call out to the sink to get a content destination FIoBuffer* PriorDest = Activity->Dest; // to retain unioned Host ptr (redirect uses it in sink) Internal.Code = -1; Internal.ContentLength = ContentLength; Activity_CallSink(Activity); Activity->NoContent |= (ContentLength == 0); // Check the user gave us a destination for content FIoBuffer& Dest = *(Activity->Dest); if (Activity->NoContent == 0) { if (&Dest == PriorDest) { Activity_SetError(Activity, "User did not provide a destination buffer"); return FOutcome::Error(Activity->ErrorReason); } // The user seems to have forgotten something. Let's help them along if (int32 DestSize = int32(Dest.GetSize()); DestSize == 0) { static const uint32 DefaultChunkSize = 4 << 10; uint32 Size = bChunked ? DefaultChunkSize : ContentLength; Dest = FIoBuffer(Size); } else if (!bChunked && DestSize < ContentLength) { // todo: support piece-wise transfer of content (a la chunked). Activity_SetError(Activity, "Destination buffer too small"); return FOutcome::Error(Activity->ErrorReason); } else if (enum { MinStreamBuf = 256 }; bChunked && DestSize < MinStreamBuf) { Dest = FIoBuffer(MinStreamBuf); } } // Perhaps we have some of the content already? const char* BufferRight = Buffer.GetData() + Buffer.GetSize(); uint32 AlreadyReceived = uint32(ptrdiff_t(BufferRight - MessageRight)); if (AlreadyReceived > uint32(ContentLength)) { Activity_SetError(Activity, "More data received that expected"); return FOutcome::Error(Activity->ErrorReason); } // HEAD methods if (Activity->NoContent == 1) { if (AlreadyReceived) { Activity_SetError(Activity, "Received content when none was expected"); return FOutcome::Error(Activity->ErrorReason); } Activity_ChangeState(Activity, FActivity::EState::RecvDone); return FOutcome::Ok(); } // We're all set to go and get content check(Activity->Dest != nullptr); auto NextState = bChunked ? FActivity::EState::RecvStream : FActivity::EState::RecvContent; Activity_ChangeState(Activity, NextState, AlreadyReceived); // Copy any of the content we may have already received. if (AlreadyReceived == 0) { return FOutcome::Ok(); } // This ordinarily doesn't happen due to the way higher levels pipeline // requests. It can however occur with chunked transfers. if (AlreadyReceived > Dest.GetSize()) { Dest = FIoBuffer(AlreadyReceived); } FMutableMemoryView DestView = Dest.GetMutableView(); const char* Cursor = BufferRight - AlreadyReceived; ::memcpy(DestView.GetData(), Cursor, AlreadyReceived); return FOutcome::Ok(); } //////////////////////////////////////////////////////////////////////////////// static FOutcome DoRecvContent(FActivity* Activity, FHttpPeer& Peer, int32& MaxRecvSize) { TRACE_CPUPROFILER_EVENT_SCOPE(IasHttp::DoRecvContent); while (true) { const FResponseInternal& Response = Activity->Response; int32 Size = (Response.ContentLength - Activity->StateParam); if (Size == 0) { break; } FOutcome Outcome = DoRecvPeer(Activity, Peer, MaxRecvSize, Size); if (!Outcome.IsOk()) { return Outcome; } } #if IAS_HTTP_WITH_PERF Activity->Stopwatch.RecvEnd(); #endif Activity_ChangeState(Activity, FActivity::EState::RecvDone); return FOutcome::Ok(); } //////////////////////////////////////////////////////////////////////////////// static FOutcome DoRecvStream(FActivity* Activity, FHttpPeer& Peer, int32& MaxRecvSize) { auto RaiseCrLfError = [Activity] () { if (Activity->NoContent) { Activity_SetError(Activity, "Trailing headers are not supported (ERRTRAIL)"); return FOutcome::Error(Activity->ErrorReason); } Activity_SetError(Activity, "Expected CRLF chunk terminal"); return FOutcome::Error(Activity->ErrorReason); }; auto SinkData = [Activity] (FMemoryView View) { if (View.GetSize() == 0) { return; } // Temporarily clamp IoBuffer so if the sink does GetView/GetSize() it // represents actual content and not the underlying working buffer. FIoBuffer& Dest = *(Activity->Dest); FIoBuffer Slice(View, Dest); Swap(Dest, Slice); Activity_CallSink(Activity); Swap(Dest, Slice); }; auto Done = [Activity] () { #if IAS_HTTP_WITH_PERF Activity->Stopwatch.RecvEnd(); #endif *(Activity->Dest) = FIoBuffer(); Activity_ChangeState(Activity, FActivity::EState::RecvDone); return FOutcome::Ok(); }; enum { CrLfLength = 2 }; int32 Size = int32(Activity->StateParam); // Trailing chunk data. while (Size < 0) { Activity->StateParam = 0; Size = -Size; int32 RefillSize = FMath::Min(Size, int32(Activity->Dest->GetSize())); FOutcome Outcome = DoRecvPeer(Activity, Peer, MaxRecvSize, RefillSize); if (!Outcome.IsOk()) { Activity->StateParam = 0 - Size; return Outcome; } int32 Result = Outcome.GetResult(); check(Result > 0); FMemoryView View = Activity->Dest->GetView(); if (Size > CrLfLength) { int32 SinkSize = Size - CrLfLength; SinkSize = FMath::Min(Result, SinkSize); SinkData(View.Left(SinkSize)); View = View.Mid(SinkSize); Size -= SinkSize; Result -= SinkSize; } const char* Cursor = (char*)View.GetData(); int32 CrLfError = 0; if (int32 n = CrLfLength; Size == n && Result >= n) { CrLfError |= (Cursor[0] != '\r'); --Size; --Result; ++Cursor; } if (int32 n = CrLfLength - 1; Size == n && Result >= n) { CrLfError |= (Cursor[0] != '\n'); --Size; --Result; } if (CrLfError) { return RaiseCrLfError(); } Size = Result - Size; Activity->StateParam = Size; check(Size <= 0); // Have we found the trailer-section that follows last-chunk? if (Size == 0 && Activity->NoContent) { return Done(); } } // Peel off chunks for (FMemoryView View = Activity->Dest->GetView(); Size > 0;) { const char* Cursor = (char*)(View.GetData()); // Isolate chunk size int32 ChunkSize = -1; uint32 HeaderLength = 0; for (; HeaderLength < uint32(Size - 1); ++HeaderLength) { // Detect CRLF. if (Cursor[HeaderLength + 1] != '\n') { continue; } ++HeaderLength; if (Cursor[HeaderLength - 1] != '\r') { continue; } ++HeaderLength; ChunkSize = int32(CrudeToInt<16>(Cursor)); if (ChunkSize < 0) { Activity_SetError(Activity, "Unparsable chunk size"); return FOutcome::Error(Activity->ErrorReason); } break; } // Maybe we were not able to find a CRLF terminator and need more data if (ChunkSize < 0) { FMutableMemoryView WriteView = Activity->Dest->GetMutableView(); std::memmove(WriteView.GetData(), Cursor, Size); Activity->StateParam = Size; break; } check(ChunkSize >= 0); Size -= HeaderLength; // Dispatch as much data as we can. uint32 SinkSize = FMath::Min(ChunkSize, uint32(Size)); SinkData(View.Mid(HeaderLength, SinkSize)); View = View.Mid(HeaderLength + SinkSize); Activity->StateParam = (Size -= ChunkSize); Activity->NoContent = (ChunkSize == 0); // A CRLF follows a chunk's data Cursor = (char*)View.GetData(); int32 CrLfError = 0; CrLfError |= (Size >= (CrLfLength - 1)) && Cursor[0] != '\r'; CrLfError |= (Size >= (CrLfLength - 0)) && Cursor[1] != '\n'; if (CrLfError != 0) { return RaiseCrLfError(); } // Can we do CRLF now? if (Size >= CrLfLength) { // Have we found the trailer-section that follows last-chunk? if (Activity->NoContent) { return Done(); } Activity->StateParam = (Size -= CrLfLength); View = View.Mid(CrLfLength); continue; } Activity->StateParam -= CrLfLength; check(int32(Activity->StateParam) < 0); break; } // Refill if (int32(Activity->StateParam) >= 0) { uint32 RefillSize = uint32(Activity->Dest->GetSize()) - Activity->StateParam; FOutcome Outcome = DoRecvPeer(Activity, Peer, MaxRecvSize, RefillSize); if (!Outcome.IsOk()) { return Outcome; } } return DoRecvStream(Activity, Peer, MaxRecvSize); } //////////////////////////////////////////////////////////////////////////////// static FOutcome DoRecv(FActivity* Activity, FHttpPeer& Peer, int32& MaxRecvSize) { using EState = FActivity::EState; EState State = Activity->State; check(State >= EState::RecvMessage && State < EState::RecvDone); if (State == EState::RecvMessage) return DoRecvMessage(Activity, Peer); if (State == EState::RecvContent) return DoRecvContent(Activity, Peer, MaxRecvSize); if (State == EState::RecvStream) return DoRecvStream(Activity, Peer, MaxRecvSize); //-V547 check(false); // it is not expected that we'll get here return FOutcome::Error("unreachable"); } //////////////////////////////////////////////////////////////////////////////// static void DoRecvDone(FActivity* Activity) { // Notify the user we've received everything Activity_CallSink(Activity); Activity_ChangeState(Activity, FActivity::EState::Completed); } //////////////////////////////////////////////////////////////////////////////// static void DoCancel(FActivity* Activity) { if (Activity->State >= FActivity::EState::Completed) { return; } Activity_ChangeState(Activity, FActivity::EState::Cancelled); Activity_CallSink(Activity); } //////////////////////////////////////////////////////////////////////////////// static void DoFail(FActivity* Activity) { check(Activity->State == FActivity::EState::Failed); // Notify the user we've received everything Activity_CallSink(Activity); } // {{{1 work-queue ............................................................. /* * - Activities (requests send with a loop) are managed in singly-linked lists * - Each activity has an associated host it is talking to. * - Hosts are ephemeral, or represented externally via a FConnectionPool object * - Loop has a group for each host, and each host-group has a bunch of socket-groups * - Host-group has a list of work; pending activities waiting to start * - Socket-groups own up to two activities; one sending, one receiving * - As it recvs, a socket-group will, if possible, fetch more work from the host * * Loop: * FHostGroup[HostPtr]: * Work: Act0 -> Act1 -> Act2 -> Act3 -> ... * FPeerGroup[0...HostMaxConnections]: * Act.Send * Act.Recv */ //////////////////////////////////////////////////////////////////////////////// struct FTickState { FActivity* DoneList; uint64 Cancels; int32& RecvAllowance; int32 PollTimeoutMs; int32 FailTimeoutMs; uint32 NowMs; class FWorkQueue* Work; }; //////////////////////////////////////////////////////////////////////////////// class FWorkQueue { public: FWorkQueue() = default; ~FWorkQueue(); bool HasWork() const { return List != nullptr; } void AddActivity(FActivity* Activity); FActivity* PopActivity(); void TickCancels(FTickState& State); private: FActivity* List = nullptr; FActivity* ListTail = nullptr; uint64 ActiveSlots = 0; UE_NONCOPYABLE(FWorkQueue); }; //////////////////////////////////////////////////////////////////////////////// FWorkQueue::~FWorkQueue() { check(List == nullptr); check(ListTail == nullptr); } //////////////////////////////////////////////////////////////////////////////// void FWorkQueue::AddActivity(FActivity* Activity) { // We use a tail pointer here to maintain order that requests were made check(Activity->Next == nullptr); if (ListTail != nullptr) { ListTail->Next = Activity; } List = (List == nullptr) ? Activity : List; ListTail = Activity; ActiveSlots |= (1ull << Activity->Slot); } //////////////////////////////////////////////////////////////////////////////// FActivity* FWorkQueue::PopActivity() { if (List == nullptr) { return nullptr; } FActivity* Activity = List; if ((List = List->Next) == nullptr) { ListTail = nullptr; } check(ActiveSlots & (1ull << Activity->Slot)); ActiveSlots ^= (1ull << Activity->Slot); Activity->Next = nullptr; return Activity; } //////////////////////////////////////////////////////////////////////////////// void FWorkQueue::TickCancels(FTickState& State) { if (State.Cancels == 0 || (State.Cancels & ActiveSlots) == 0) { return; } // We are going to rebuild the list of activities to maintain order as the // activity list is singular. check(List != nullptr); FActivity* Activity = List; List = ListTail = nullptr; ActiveSlots = 0; for (FActivity* Next; Activity != nullptr; Activity = Next) { Next = Activity->Next; if (uint64 Slot = (1ull << Activity->Slot); (State.Cancels & Slot) == 0) { Activity->Next = nullptr; AddActivity(Activity); continue; } DoCancel(Activity); Activity->Next = State.DoneList; State.DoneList = Activity; } } // {{{1 peer-group ............................................................. //////////////////////////////////////////////////////////////////////////////// class FPeerGroup { public: FPeerGroup() = default; ~FPeerGroup(); void Unwait() { check(bWaiting); bWaiting = false; } FWaiter GetWaiter() const; bool Tick(FTickState& State); void TickSend(FTickState& State, FHost& Host, FPoller& Poller); void Fail(FTickState& State, const char* Reason); private: void Negotiate(FTickState& State); void RecvInternal(FTickState& State); void SendInternal(FTickState& State); FActivity* Send = nullptr; FActivity* Recv = nullptr; FHttpPeer Peer; uint32 LastUseMs = 0; uint8 IsKeepAlive = 0; bool bNegotiating = false; bool bWaiting = false; UE_NONCOPYABLE(FPeerGroup); }; //////////////////////////////////////////////////////////////////////////////// FPeerGroup::~FPeerGroup() { check(Send == nullptr); check(Recv == nullptr); } //////////////////////////////////////////////////////////////////////////////// FWaiter FPeerGroup::GetWaiter() const { if (!bWaiting) { return FWaiter(); } FWaitable Waitable = Peer.GetWaitable(); FWaiter Waiter(MoveTemp(Waitable)); Waiter.WaitFor((Recv != nullptr) ? FWaiter::EWhat::Recv : FWaiter::EWhat::Send); return Waiter; } //////////////////////////////////////////////////////////////////////////////// void FPeerGroup::Fail(FTickState& State, const char* Reason) { // Any send left at this point is unrecoverable if (Send != nullptr) { Send->Next = Recv; Recv = Send; } // Failure is quite terminal and we need to abort everything for (FActivity* Activity = Recv; Activity != nullptr;) { if (Activity->State != FActivity::EState::Failed) { Activity_SetError(Activity, Reason); } DoFail(Activity); FActivity* Next = Activity->Next; Activity->Next = State.DoneList; State.DoneList = Activity; Activity = Next; } Peer = FHttpPeer(); Send = Recv = nullptr; bWaiting = false; IsKeepAlive = 0; bNegotiating = false; } //////////////////////////////////////////////////////////////////////////////// void FPeerGroup::Negotiate(FTickState& State) { check(bNegotiating); check(Send != nullptr); check(Peer.IsValid()); FOutcome Outcome = Peer.Handshake(); if (Outcome.IsError()) { Fail(State, Outcome.GetMessage().GetData()); return; } if (Outcome.IsWaiting()) { bWaiting = true; return; } bNegotiating = false; return SendInternal(State); } //////////////////////////////////////////////////////////////////////////////// void FPeerGroup::RecvInternal(FTickState& State) { check(bNegotiating == false); check(Recv != nullptr); // Another helper lambda auto IsReceiving = [] (const FActivity* Act) { using EState = FActivity::EState; return (Act->State >= EState::RecvMessage) & (Act->State < EState::RecvDone); }; FActivity* Activity = Recv; check(IsReceiving(Activity)); FOutcome Outcome = DoRecv(Activity, Peer, State.RecvAllowance); // Any sort of error here is unrecoverable if (Outcome.IsError()) { Fail(State, Outcome.GetMessage().GetData()); return; } IsKeepAlive &= Activity->IsKeepAlive; LastUseMs = State.NowMs; bWaiting |= Outcome.IsWaiting(); // If we've only a small amount left to receive we can start more work if (IsKeepAlive & (Recv->Next == nullptr) & (Send == nullptr)) { uint32 Remaining = Activity_RemainingKiB(Activity); if (Remaining < uint32(GRecvWorkThresholdKiB)) { if (FActivity* Next = State.Work->PopActivity(); Next != nullptr) { Trace(Activity, ETrace::StartWork); check(Send == nullptr); Send = Next; SendInternal(State); if (!Peer.IsValid()) { return; } } } } // If there was no data available this is far as receiving can go if (Outcome.IsWaiting()) { return; } // If we're still in a receiving state we will just try again otherwise it // is finished and we will let DoneList recipient finish it off. if (IsReceiving(Activity)) { return; } DoRecvDone(Activity); Recv = Activity->Next; Activity->Next = State.DoneList; State.DoneList = Activity; // If the server wants to close the socket we need to rewind the send if (IsKeepAlive != 0) { return; } if (Send != nullptr && Activity_Rewind(Send) < 0) { Fail(State, "Unable to rewind on keep-alive close"); return; } Peer = FHttpPeer(); } //////////////////////////////////////////////////////////////////////////////// void FPeerGroup::SendInternal(FTickState& State) { check(bNegotiating == false); check(IsKeepAlive == 1); check(Send != nullptr); FActivity* Activity = Send; FOutcome Outcome = DoSend(Activity, Peer); if (Outcome.IsWaiting()) { bWaiting = true; return; } if (Outcome.IsError()) { Fail(State, Outcome.GetMessage().GetData()); return; } Send = nullptr; // Pass along this send to be received if (Recv == nullptr) { Recv = Activity; return; } check(Recv->Next == nullptr); Recv->Next = Activity; } //////////////////////////////////////////////////////////////////////////////// bool FPeerGroup::Tick(FTickState& State) { if (bNegotiating) { Negotiate(State); } else if (Send != nullptr) { SendInternal(State); } if (Recv != nullptr && State.RecvAllowance) { RecvInternal(State); } return !!IsKeepAlive | !!(UPTRINT(Send) | UPTRINT(Recv)); } //////////////////////////////////////////////////////////////////////////////// void FPeerGroup::TickSend(FTickState& State, FHost& Host, FPoller& Poller) { // This path is only for those that are idle and have nothing to do if (Send != nullptr || Recv != nullptr) { return; } // Failing will try and recover work which we don't want to happen yet FActivity* Pending = State.Work->PopActivity(); check(Pending != nullptr); // Close idle sockets if (Peer.IsValid() && LastUseMs + GIdleMs < State.NowMs) { LastUseMs = State.NowMs; Peer = FHttpPeer(); } // We don't have a connected socket on first use, or if a keep-alive:close // was received from the server. So we connect here. bool bWillBlock = false; if (!Peer.IsValid()) { FOutcome Outcome = FOutcome::None(); FSocket Socket; if (Socket.Create()) { FWaitable Waitable = Socket.GetWaitable(); Poller.Register(Waitable); Outcome = Host.Connect(Socket); } else { Outcome = FOutcome::Error("Failed to create socket"); } if (Outcome.IsError()) { // We failed to connect, let's bail. Pending->Next = Recv; Recv = Pending; Fail(State, Outcome.GetMessage().GetData()); return; } IsKeepAlive = 1; bNegotiating = true; bWillBlock = Outcome.IsWaiting(); FCertRootsRef VerifyCert = Host.GetVerifyCert(); const char* HostName = Host.GetHostName().GetData(); Peer = FHttpPeer(MoveTemp(Socket), VerifyCert, HostName); } Send = Pending; if (!bWillBlock) { if (bNegotiating) { return Negotiate(State); } return SendInternal(State); } // Non-blocking connect bWaiting = true; } // {{{1 host-group ............................................................. //////////////////////////////////////////////////////////////////////////////// class FHostGroup { public: FHostGroup(FHost& InHost); bool IsBusy() const { return BusyCount != 0; } const FHost& GetHost() const { return Host; } void Tick(FTickState& State); void AddActivity(FActivity* Activity); private: int32 Wait(const FTickState& State); TArray PeerGroups; FWorkQueue Work; FHost& Host; FPoller Poller; uint32 BusyCount = 0; int32 WaitTimeAccum = 0; UE_NONCOPYABLE(FHostGroup); }; //////////////////////////////////////////////////////////////////////////////// FHostGroup::FHostGroup(FHost& InHost) : Host(InHost) { uint32 Num = InHost.GetMaxConnections(); PeerGroups.SetNum(Num); } //////////////////////////////////////////////////////////////////////////////// int32 FHostGroup::Wait(const FTickState& State) { // Collect groups that are waiting on something TArray> Waiters; for (uint32 i = 0, n = PeerGroups.Num(); i < n; ++i) { FWaiter Waiter = PeerGroups[i].GetWaiter(); if (!Waiter.IsValid()) { continue; } Waiter.SetIndex(i); Waiters.Add(MoveTemp(Waiter)); } if (Waiters.IsEmpty()) { return 0; } Trace(ETrace::Wait); ON_SCOPE_EXIT { Trace(ETrace::Unwait); }; // If the poll timeout is negative then treat that as a fatal timeout check(State.FailTimeoutMs); int32 PollTimeoutMs = State.PollTimeoutMs; if (PollTimeoutMs < 0) { PollTimeoutMs = State.FailTimeoutMs; } // Actually do the wait int32 Result = FWaiter::Wait(Waiters, Poller, PollTimeoutMs); if (Result <= 0) { // If the user opts to not block then we don't accumulate wait time and // leave it to them to manage time a fail timoue WaitTimeAccum += PollTimeoutMs; if (State.PollTimeoutMs < 0 || WaitTimeAccum >= State.FailTimeoutMs) { return MIN_int32; } return Result; } WaitTimeAccum = 0; // For each waiter that's ready, find the associated group "unwait" them. int32 Count = 0; for (int32 i = 0, n = Waiters.Num(); i < n; ++i) { if (!Waiters[i].IsReady()) { continue; } uint32 Index = Waiters[i].GetIndex(); check(Index < uint32(PeerGroups.Num())); PeerGroups[Index].Unwait(); Waiters.RemoveAtSwap(i, EAllowShrinking::No); --n, --i, ++Count; } check(Count == Result); return Result; } //////////////////////////////////////////////////////////////////////////////// void FHostGroup::Tick(FTickState& State) { State.Work = &Work; if (BusyCount = Work.HasWork(); BusyCount) { Work.TickCancels(State); // Get available work out on idle sockets as soon as possible for (FPeerGroup& Group : PeerGroups) { if (!Work.HasWork()) { break; } Group.TickSend(State, Host, Poller); } } // Wait on the groups that are if (int32 Result = Wait(State); Result < 0) { const char* Reason = (Result == MIN_int32) ? "FailTimeout hit" : "poll() returned an unexpected error"; for (FPeerGroup& Group : PeerGroups) { Group.Fail(State, Reason); } return; } // Tick everything, starting with groups that are maybe closest to finishing for (FPeerGroup& Group : PeerGroups) { BusyCount += (Group.Tick(State) == true); } } //////////////////////////////////////////////////////////////////////////////// void FHostGroup::AddActivity(FActivity* Activity) { Work.AddActivity(Activity); } // {{{1 event-loop ............................................................. //////////////////////////////////////////////////////////////////////////////// static const FEventLoop::FRequestParams GDefaultParams; //////////////////////////////////////////////////////////////////////////////// class FEventLoop::FImpl { public: ~FImpl(); uint32 Tick(int32 PollTimeoutMs=0); bool IsIdle() const; void Throttle(uint32 KiBPerSec); void SetFailTimeout(int32 TimeoutMs); void Cancel(FTicket Ticket); FRequest Request(FAnsiStringView Method, FAnsiStringView Path, FActivity* Activity); FTicket Send(FActivity* Activity); private: void ReceiveWork(); FCriticalSection Lock; std::atomic FreeSlots = ~0ull; std::atomic Cancels = 0; uint64 PrevFreeSlots = ~0ull; FActivity* Pending = nullptr; FThrottler Throttler; TArray Groups; int32 FailTimeoutMs = GIdleMs; uint32 BusyCount = 0; }; //////////////////////////////////////////////////////////////////////////////// FEventLoop::FImpl::~FImpl() { check(BusyCount == 0); } //////////////////////////////////////////////////////////////////////////////// FRequest FEventLoop::FImpl::Request( FAnsiStringView Method, FAnsiStringView Path, FActivity* Activity) { Trace(Activity, ETrace::ActivityCreate, 0); Activity_ChangeState(Activity, FActivity::EState::Build); if (Path.Len() == 0) { Path = "/"; } Activity->NoContent = (Method == "HEAD"); auto& Buffer = Activity->Buffer; Buffer.Begin(Method, Path); Buffer.AddHeader("Host", Activity->Host->GetHostName()); // HTTP/1.1 is persistent by default thus "Connection" header isn't required if (!Activity->IsKeepAlive) { Buffer.AddHeader("Connection", "close"); } FRequest Ret; Ret.Ptr = Activity; return Ret; } //////////////////////////////////////////////////////////////////////////////// FTicket FEventLoop::FImpl::Send(FActivity* Activity) { Trace(Activity, ETrace::RequestBegin); Activity->Buffer.End(); Activity_ChangeState(Activity, FActivity::EState::Send); uint64 Slot; { FScopeLock _(&Lock); for (;; FPlatformProcess::SleepNoStats(0.0f)) { uint64 FreeSlotsLoad = FreeSlots.load(std::memory_order_relaxed); if (!FreeSlotsLoad) { // we don't handle oversubscription at the moment. Could return // activity to Reqeust and return a 0 ticket. check(false); } Slot = -int64(FreeSlotsLoad) & FreeSlotsLoad; if (FreeSlots.compare_exchange_weak(FreeSlotsLoad, FreeSlotsLoad - Slot, std::memory_order_relaxed)) { break; } } Activity->Slot = int8(63 - FMath::CountLeadingZeros64(Slot)); // This puts pending requests in reverse order of when they were made // but this will be undone when ReceiveWork() traverses the list Activity->Next = Pending; Pending = Activity; } return Slot; } //////////////////////////////////////////////////////////////////////////////// bool FEventLoop::FImpl::IsIdle() const { return FreeSlots.load(std::memory_order_relaxed) == ~0ull; } //////////////////////////////////////////////////////////////////////////////// void FEventLoop::FImpl::Throttle(uint32 KiBPerSec) { Throttler.SetLimit(KiBPerSec); } //////////////////////////////////////////////////////////////////////////////// void FEventLoop::FImpl::SetFailTimeout(int32 TimeoutMs) { if (TimeoutMs > 0) { FailTimeoutMs = TimeoutMs; } else { FailTimeoutMs = GIdleMs; // Reset to default } } //////////////////////////////////////////////////////////////////////////////// void FEventLoop::FImpl::Cancel(FTicket Ticket) { Cancels.fetch_or(Ticket, std::memory_order_relaxed); } //////////////////////////////////////////////////////////////////////////////// void FEventLoop::FImpl::ReceiveWork() { uint64 FreeSlotsLoad = FreeSlots.load(std::memory_order_relaxed); if (FreeSlots == PrevFreeSlots) { return; } PrevFreeSlots = FreeSlotsLoad; // Fetch the pending activities from out in the wild FActivity* Activity = nullptr; { FScopeLock _(&Lock); Swap(Activity, Pending); } // Pending is in the reverse of the order that requests were made FActivity* Reverse = nullptr; for (FActivity* Next; Activity != nullptr; Activity = Next) { Next = Activity->Next; Activity->Next = Reverse; Reverse = Activity; } Activity = Reverse; // Group activities by their host. for (FActivity* Next; Activity != nullptr; Activity = Next) { Next = Activity->Next; Activity->Next = nullptr; FHost& Host = *(Activity->Host); auto Pred = [&Host] (const FHostGroup& Lhs) { return &Lhs.GetHost() == &Host; }; FHostGroup* Group = Groups.FindByPredicate(Pred); if (Group == nullptr) { Group = &(Groups.Emplace_GetRef(Host)); } Group->AddActivity(Activity); ++BusyCount; } } //////////////////////////////////////////////////////////////////////////////// uint32 FEventLoop::FImpl::Tick(int32 PollTimeoutMs) { TRACE_CPUPROFILER_EVENT_SCOPE(IasHttp::Tick); ReceiveWork(); // We limit recv sizes as a way to control bandwidth use. int32 RecvAllowance = Throttler.GetAllowance(); if (RecvAllowance <= 0) { if (PollTimeoutMs == 0) { return BusyCount; } int32 ThrottleWaitMs = -RecvAllowance; if (PollTimeoutMs > 0) { ThrottleWaitMs = FMath::Min(ThrottleWaitMs, PollTimeoutMs); } FPlatformProcess::SleepNoStats(float(ThrottleWaitMs) / 1000.0f); RecvAllowance = Throttler.GetAllowance(); if (RecvAllowance <= 0) { return BusyCount; } } uint64 CancelsLoad = Cancels.load(std::memory_order_relaxed); uint32 NowMs; { // 4.2MM seconds will give us 50 days of uptime. static uint64 Freq = 0; static uint64 Base = 0; if (Freq == 0) { Freq = uint64(1.0 / FPlatformTime::GetSecondsPerCycle64()); Base = FPlatformTime::Cycles64(); } uint64 NowBig = ((FPlatformTime::Cycles64() - Base) * 1000) / Freq; NowMs = uint32(NowBig); check(NowMs == NowBig); } // Tick groups and then remove ones that are idle FTickState TickState = { .DoneList = nullptr, .Cancels = CancelsLoad, .RecvAllowance = RecvAllowance, .PollTimeoutMs = PollTimeoutMs, .FailTimeoutMs = FailTimeoutMs, .NowMs = NowMs, }; for (FHostGroup& Group : Groups) { Group.Tick(TickState); } for (uint32 i = 0, n = Groups.Num(); i < n; ++i) { FHostGroup& Group = Groups[i]; if (Group.IsBusy()) { continue; } Groups.RemoveAtSwap(i, EAllowShrinking::No); --n, --i; } Throttler.ReturnUnused(RecvAllowance); uint64 ReturnedSlots = 0; for (FActivity* Activity = TickState.DoneList; Activity != nullptr;) { FActivity* Next = Activity->Next; ReturnedSlots |= (1ull << Activity->Slot); Activity_Free(Activity); --BusyCount; Activity = Next; } uint32 BusyBias = 0; if (ReturnedSlots) { uint64 LatestFree = FreeSlots.fetch_add(ReturnedSlots, std::memory_order_relaxed); BusyBias += (LatestFree != PrevFreeSlots); PrevFreeSlots += ReturnedSlots; } if (CancelsLoad) { Cancels.fetch_and(~CancelsLoad, std::memory_order_relaxed); } return BusyCount + BusyBias; } //////////////////////////////////////////////////////////////////////////////// FEventLoop::FEventLoop() { Impl = new FEventLoop::FImpl(); Trace(Impl, ETrace::LoopCreate); } FEventLoop::~FEventLoop() { Trace(Impl, ETrace::LoopDestroy); delete Impl; } uint32 FEventLoop::Tick(int32 PollTimeoutMs) { return Impl->Tick(PollTimeoutMs); } bool FEventLoop::IsIdle() const { return Impl->IsIdle(); } void FEventLoop::Cancel(FTicket Ticket) { return Impl->Cancel(Ticket); } void FEventLoop::Throttle(uint32 KiBPerSec) { return Impl->Throttle(KiBPerSec); } void FEventLoop::SetFailTimeout(int32 Ms) { return Impl->SetFailTimeout(Ms); } //////////////////////////////////////////////////////////////////////////////// FRequest FEventLoop::Request( FAnsiStringView Method, FAnsiStringView Url, const FRequestParams* Params) { // Parse the URL into its components FUrlOffsets UrlOffsets; if (ParseUrl(Url, UrlOffsets) < 0) { return FRequest(); } FAnsiStringView HostName = UrlOffsets.HostName.Get(Url); uint32 Port = 0; if (UrlOffsets.Port) { FAnsiStringView PortView = UrlOffsets.Port.Get(Url); Port = uint32(CrudeToInt(PortView)); } FAnsiStringView Path; if (UrlOffsets.Path > 0) { Path = Url.Mid(UrlOffsets.Path); } // Create an activity and an emphemeral host Params = (Params != nullptr) ? Params : &GDefaultParams; FCertRootsRef VerifyCert = FCertRoots::NoTls(); if (UrlOffsets.SchemeLength == 5) { if (VerifyCert = Params->VerifyCert; VerifyCert == ECertRootsRefType::None) { VerifyCert = FCertRoots::Default(); } check(VerifyCert != ECertRootsRefType::None); } uint32 BufferSize = Params->BufferSize; BufferSize = (BufferSize >= 128) ? BufferSize : 128; BufferSize += sizeof(FHost) + HostName.Len(); FActivity* Activity = Activity_Alloc(BufferSize); FBuffer& Buffer = Activity->Buffer; FHost* Host = Activity->Host = Buffer.Alloc(); Activity->IsKeepAlive = 0; Activity->bFollow30x = (Params->bAutoRedirect == true); Activity->bAllowChunked = (Params->bAllowChunked == true); uint32 HostNameLength = HostName.Len(); char* HostNamePtr = Buffer.Alloc(HostNameLength + 1); Buffer.Fix(); Activity_SetScore(Activity, Params->ContentSizeEst); memcpy(HostNamePtr, HostName.GetData(), HostNameLength); HostNamePtr[HostNameLength] = '\0'; new (Host) FHost({ .HostName = HostNamePtr, .Port = Port, .VerifyCert = VerifyCert, }); return Impl->Request(Method, Path, Activity); } //////////////////////////////////////////////////////////////////////////////// FRequest FEventLoop::Request( FAnsiStringView Method, FAnsiStringView Path, FConnectionPool& Pool, const FRequestParams* Params) { check(Pool.Ptr != nullptr); check(Params == nullptr || Params->VerifyCert == ECertRootsRefType::None); // add cert to FConPool instead Params = (Params != nullptr) ? Params : &GDefaultParams; uint32 BufferSize = Params->BufferSize; BufferSize = (BufferSize >= 128) ? BufferSize : 128; FActivity* Activity = Activity_Alloc(BufferSize); Activity->Host = Pool.Ptr; Activity->IsKeepAlive = 1; Activity->bFollow30x = (Params->bAutoRedirect == true); Activity->bAllowChunked = (Params->bAllowChunked == true); Activity->LengthScore = 0; Activity_SetScore(Activity, Params->ContentSizeEst); return Impl->Request(Method, Path, Activity); } //////////////////////////////////////////////////////////////////////////////// bool FEventLoop::Redirect(const FTicketStatus& Status, FTicketSink& OuterSink) { const FResponse& Response = Status.GetResponse(); switch (Response.GetStatusCode()) { case 301: // RedirectMoved case 302: // RedirectFound case 307: // RedirectTemp case 308: break; // RedirectPerm default: return false; } FAnsiStringView Location = Response.GetHeader("Location"); if (Location.IsEmpty()) { // todo: turn source activity into an error? return false; } check(Response.GetContentLength() == 0); // should we ever hit this, we'll fix it const auto& Activity = (FActivity&)Response; // todo: yuk // Original method should remain unchanged FAnsiStringView Method = Activity.Buffer.GetMethod(); check(!Method.IsEmpty()); FRequest ForwardRequest; if (!Location.StartsWith("http://") && !Location.StartsWith("https://")) { if (Location[0] != '/') { return false; } FHost& Host = *(Activity.Host); TAnsiStringBuilder<256> Url; Url << ((Host.GetVerifyCert() != ECertRootsRefType::None) ? "https" : "http"); Url << "://"; Url << Host.GetHostName(); Url << ":" << Host.GetPort(); Url << Location; FRequestParams RequestParams = { .VerifyCert = Host.GetVerifyCert(), }; new (&ForwardRequest) FRequest(Request(Method, Url, &RequestParams)); } else { new (&ForwardRequest) FRequest(Request(Method, Location)); } // Transfer original request headers check(Activity.State == FActivity::EState::RecvMessage); Activity.Buffer.EnumerateHeaders([&ForwardRequest] (FAnsiStringView Name, FAnsiStringView Value) { if (Name != "Host" && Name != "Connection") { ForwardRequest.Header(Name, Value); } return true; }); // Send the request Send(MoveTemp(ForwardRequest), MoveTemp(OuterSink), Status.GetParam()); // todo: activity slots should be swapped so original slot matches ticket return true; } //////////////////////////////////////////////////////////////////////////////// FTicket FEventLoop::Send(FRequest&& Request, FTicketSink Sink, UPTRINT SinkParam) { FActivity* Activity = nullptr; Swap(Activity, Request.Ptr); Activity->SinkParam = SinkParam; Activity->Sink = MoveTemp(Sink); // Intercept sink calls to catch 30x status codes and follow them if (Activity->bFollow30x) { auto RedirectSink = [ this, OuterSink=MoveTemp(Activity->Sink) ] (const FTicketStatus& Status) mutable { if (Status.GetId() == FTicketStatus::EId::Response) { if (Redirect(Status, OuterSink)) { return; } } if (OuterSink) { return OuterSink(Status); } }; Activity->Sink = RedirectSink; } return Impl->Send(Activity); } // }}} } // namespace UE::IoStore::HTTP