zero copy logic for abyss::http::ConnImpl

pull/35/head
Jeff Becker 6 years ago
parent f873b18036
commit e8bbc10838
No known key found for this signature in database
GPG Key ID: F357B3B42F6F9B05

@ -10,6 +10,11 @@ namespace abyss
{
namespace http
{
#if __cplusplus >= 201703L
typedef std::string_view string_view;
#else
typedef std::string string_view;
#endif
struct RequestHeader
{
typedef std::unordered_multimap< std::string, std::string > Headers_t;
@ -76,16 +81,16 @@ namespace abyss
}
bool
ProcessMethodLine(std::string& line)
ProcessMethodLine(string_view line)
{
// TODO: implement me
auto idx = line.find_first_of(' ');
if(idx == std::string::npos)
if(idx == string_view::npos)
return false;
m_Header.Method = line.substr(0, idx);
line = line.substr(idx + 1);
idx = line.find_first_of(' ');
if(idx == std::string::npos)
if(idx == string_view::npos)
return false;
m_Header.Path = line.substr(0, idx);
m_State = eReadHTTPHeaders;
@ -93,21 +98,26 @@ namespace abyss
}
bool
ShouldProcessHeader(const std::string& name) const
ShouldProcessHeader(const string_view& name) const
{
// TODO: header whitelist
return true;
}
bool
ProcessHeaderLine(std::string& line)
ProcessHeaderLine(string_view line)
{
// TODO: implement me
if(line.size() == 0)
{
// end of headers
m_State = eReadHTTPBody;
return true;
}
auto idx = line.find_first_of(':');
if(idx == std::string::npos)
if(idx == string_view::npos)
return false;
std::string header = line.substr(0, idx);
std::string val = line.substr(idx);
string_view header = line.substr(0, idx);
string_view val = line.substr(idx);
// to lowercase
std::transform(header.begin(), header.end(), header.begin(),
[](char ch) -> char { return ::tolower(ch); });
@ -127,7 +137,6 @@ namespace abyss
message.c_str());
if(sz > 0)
{
llarp::LogInfo("HTTP ", code, " ", message);
return llarp_tcp_conn_async_write(_conn, buf, sz);
}
else
@ -163,7 +172,6 @@ namespace abyss
bool
FeedBody(const char* buf, size_t sz)
{
llarp::LogInfo("HTTP ", m_Header.Method, " ", m_Header.Path, " ", sz);
if(sz == 0)
{
return WriteResponseSimple(400, "Bad Request", "text/plain", "nope");
@ -187,42 +195,36 @@ namespace abyss
m_LastActive = llarp_time_now_ms();
if(m_State < eReadHTTPBody)
{
if(strstr(buf, "\r\n") == nullptr)
{
// probably too big or small
return false;
}
m_ReadBuf << std::string(buf, sz);
std::string line;
while(std::getline(m_ReadBuf, line, '\n'))
const char* end = strstr(buf, "\r\n");
while(end)
{
if(line[0] == '\r')
string_view line(buf, end);
switch(m_State)
{
m_State = eReadHTTPBody;
line = m_ReadBuf.str();
const char* ptr = strstr(line.c_str(), "\r\n\r\n");
if(ptr == nullptr)
return false;
line = std::string(ptr + 4);
m_ReadBuf.clear();
return FeedBody(line.c_str(), line.size());
case eReadHTTPMethodLine:
if(!ProcessMethodLine(line))
return false;
sz -= line.size();
break;
case eReadHTTPHeaders:
if(!ProcessHeaderLine(line))
return false;
sz -= line.size();
break;
default:
end = nullptr;
break;
}
auto pos = line.find_first_of('\r');
if(pos == std::string::npos)
if(end)
{
return false;
buf = end + (2 * sizeof(char));
end = strstr(buf, "\r\n");
}
line = line.substr(0, pos);
if(!FeedLine(line))
return false;
}
m_ReadBuf.str(line);
}
else
if(m_State == eReadHTTPBody)
return FeedBody(buf, sz);
return true;
return false;
}
static void
@ -275,7 +277,7 @@ namespace abyss
_conn = nullptr;
}
}
};
}; // namespace http
IRPCHandler::IRPCHandler(ConnImpl* conn) : m_Impl(conn)
{

@ -182,9 +182,8 @@ namespace llarp
{
if(_shouldClose)
{
if(tcp->closed)
if(tcp && tcp->closed)
tcp->closed(tcp);
delete tcp;
return false;
}
else if(tcp->tick)

@ -91,8 +91,7 @@ namespace llarp
}
/// for tun
ev_io(int f, LossyWriteQueue_t* lossyqueue)
: fd(f), m_LossyWriteQueue(lossyqueue)
ev_io(int f, LossyWriteQueue_t* q) : fd(f), m_LossyWriteQueue(q)
{
}
@ -223,19 +222,28 @@ namespace llarp
bool _shouldClose = false;
llarp_tcp_conn* tcp;
tcp_conn(int fd, llarp_tcp_conn* conn)
: ev_io(fd, new LosslessWriteQueue_t()), tcp(conn)
: ev_io(fd, new LosslessWriteQueue_t{}), tcp(conn)
{
}
virtual ~tcp_conn()
{
delete tcp;
}
virtual int
do_write(const void* buf, size_t sz)
{
if(_shouldClose)
return -1;
return ::send(fd, buf, sz, MSG_NOSIGNAL); // ignore sigpipe
}
int
read(void* buf, size_t sz)
{
if(_shouldClose)
return -1;
ssize_t amount = ::read(fd, buf, sz);
if(amount > 0)
{
@ -245,7 +253,7 @@ namespace llarp
else
{
// error
llarp_tcp_conn_close(tcp);
_shouldClose = true;
return -1;
}
return 0;
@ -344,8 +352,8 @@ struct llarp_ev_loop
void
tick_listeners()
{
auto itr = handlers.begin();
while(itr != handlers.end())
auto itr = handlers.cbegin();
while(itr != handlers.cend())
{
if((*itr)->tick())
++itr;

Loading…
Cancel
Save