[curl] add a curl looper to handle url requests

pull/254/head
Timothy Stack 9 years ago
parent 248dd78f63
commit f286950854

@ -4,6 +4,7 @@ lnav v0.8.0:
* Integration with "papertrailapp.com" for querying and tailing * Integration with "papertrailapp.com" for querying and tailing
server log and syslog messages. See the Papertrail section in server log and syslog messages. See the Papertrail section in
the online help for more details. the online help for more details.
* Remote files can be opened when linked with libcurl.
* Log formats that are "containers" for other log formats, like * Log formats that are "containers" for other log formats, like
syslog, are now supported. See the online help for more syslog, are now supported. See the online help for more
information. information.

@ -24,6 +24,7 @@ Lnav requires the following software packages:
zlib - The zlib compression library. zlib - The zlib compression library.
bz2 - The bzip2 compression library. bz2 - The bzip2 compression library.
re2c - The re2c scanner generator. re2c - The re2c scanner generator.
libcurl - The cURL library for downloading files from URLs.
INSTALLATION INSTALLATION

@ -33,9 +33,15 @@ test_num=0
lnav_test="${top_builddir}/src/lnav-test" lnav_test="${top_builddir}/src/lnav-test"
export lnav_test export lnav_test
lnav="${top_builddir}/src/lnav"
export lnav
LNAV_LOG_PATH="${top_builddir}/test/test.log" LNAV_LOG_PATH="${top_builddir}/test/test.log"
export LNAV_LOG_PATH export LNAV_LOG_PATH
SFTP_TEST_URL="@SFTP_TEST_URL@"
export SFTP_TEST_URL
## BEGIN Functions ## BEGIN Functions
LAST_TEST="" LAST_TEST=""

@ -44,6 +44,8 @@ CPPFLAGS="$CPPFLAGS -D_ISOC99_SOURCE -D__STDC_LIMIT_MACROS"
# CFLAGS=`echo $CFLAGS | sed 's/-O2//g'` # CFLAGS=`echo $CFLAGS | sed 's/-O2//g'`
# CXXFLAGS=`echo $CXXFLAGS | sed 's/-O2//g'` # CXXFLAGS=`echo $CXXFLAGS | sed 's/-O2//g'`
AC_ARG_VAR(SFTP_TEST_URL)
AC_ARG_ENABLE([profiling], AC_ARG_ENABLE([profiling],
AS_HELP_STRING([--enable-profiling], AS_HELP_STRING([--enable-profiling],
[Compile with gprof(1) profiling support])) [Compile with gprof(1) profiling support]))

@ -4,6 +4,7 @@ set(diag_STAT_SRCS
bookmarks.cc bookmarks.cc
collation-functions.cc collation-functions.cc
command_executor.cc command_executor.cc
curl_looper.cc
db_sub_source.cc db_sub_source.cc
environ_vtab.cc environ_vtab.cc
extension-functions.c extension-functions.c
@ -81,6 +82,7 @@ set(diag_STAT_SRCS
command_executor.hh command_executor.hh
concise_index.hh concise_index.hh
column_namer.hh column_namer.hh
curl_looper.hh
field_overlay_source.hh field_overlay_source.hh
filter_observer.hh filter_observer.hh
format-text-files.hh format-text-files.hh
@ -98,6 +100,7 @@ set(diag_STAT_SRCS
plain_text_source.hh plain_text_source.hh
pretty_printer.hh pretty_printer.hh
ptimec.hh ptimec.hh
pthreadpp.hh
readline_callbacks.hh readline_callbacks.hh
readline_possibilities.hh readline_possibilities.hh
sequence_sink.hh sequence_sink.hh
@ -109,6 +112,7 @@ set(diag_STAT_SRCS
textfile_sub_source.hh textfile_sub_source.hh
time_T.hh time_T.hh
top_status_source.hh top_status_source.hh
url_loader.hh
views_vtab.hh views_vtab.hh
yajl/api/yajl_common.h yajl/api/yajl_common.h

@ -102,6 +102,7 @@ noinst_HEADERS = \
column_namer.hh \ column_namer.hh \
command_executor.hh \ command_executor.hh \
concise_index.hh \ concise_index.hh \
curl_looper.hh \
data_scanner.hh \ data_scanner.hh \
data_scanner_re.re \ data_scanner_re.re \
data_parser.hh \ data_parser.hh \
@ -143,6 +144,7 @@ noinst_HEADERS = \
piper_proc.hh \ piper_proc.hh \
plain_text_source.hh \ plain_text_source.hh \
pretty_printer.hh \ pretty_printer.hh \
pthreadpp.hh \
ptimec.hh \ ptimec.hh \
readline_callbacks.hh \ readline_callbacks.hh \
readline_curses.hh \ readline_curses.hh \
@ -166,6 +168,7 @@ noinst_HEADERS = \
time_T.hh \ time_T.hh \
timer.hh \ timer.hh \
top_status_source.hh \ top_status_source.hh \
url_loader.hh \
view_curses.hh \ view_curses.hh \
views_vtab.hh \ views_vtab.hh \
vt52_curses.hh \ vt52_curses.hh \
@ -196,6 +199,7 @@ libdiag_a_SOURCES = \
bookmarks.cc \ bookmarks.cc \
collation-functions.cc \ collation-functions.cc \
command_executor.cc \ command_executor.cc \
curl_looper.cc \
db_sub_source.cc \ db_sub_source.cc \
environ_vtab.cc \ environ_vtab.cc \
extension-functions.c \ extension-functions.c \

@ -0,0 +1,258 @@
/**
* Copyright (c) 2015, Timothy Stack
*
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
*
* * Redistributions of source code must retain the above copyright notice, this
* list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above copyright notice,
* this list of conditions and the following disclaimer in the documentation
* and/or other materials provided with the distribution.
* * Neither the name of Timothy Stack nor the names of its contributors
* may be used to endorse or promote products derived from this software
* without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE REGENTS AND CONTRIBUTORS ''AS IS'' AND ANY
* EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
* WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
* DISCLAIMED. IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE FOR ANY
* DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
* (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
* LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON
* ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
* SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*
* @file curl_looper.cc
*/
#include "config.h"
#ifdef HAVE_LIBCURL
#include <curl/multi.h>
#include "curl_looper.hh"
using namespace std;
struct curl_request_eq {
curl_request_eq(const std::string &name) : cre_name(name) {
};
bool operator()(const curl_request *cr) const {
return this->cre_name == cr->get_name();
};
bool operator()(const pair<mstime_t, curl_request *> &pair) const {
return this->cre_name == pair.second->get_name();
};
const std::string &cre_name;
};
int curl_request::debug_cb(CURL *handle,
curl_infotype type,
char *data,
size_t size,
void *userp) {
curl_request *cr = (curl_request *) userp;
if (type == CURLINFO_TEXT) {
while (size > 0 && isspace(data[size - 1])) {
size -= 1;
}
log_debug("%s:%.*s", cr->get_name().c_str(), size, data);
}
return 0;
}
void *curl_looper::trampoline(void *arg)
{
curl_looper *cl = (curl_looper *) arg;
return cl->run();
}
void *curl_looper::run()
{
log_info("curl looper thread started");
while (this->cl_looping) {
this->loop_body();
}
log_info("curl looper thread exiting");
return NULL;
}
void curl_looper::loop_body()
{
mstime_t current_time = getmstime();
int timeout = this->compute_timeout(current_time);
if (this->cl_handle_to_request.empty()) {
mutex_guard mg(this->cl_mutex);
if (this->cl_new_requests.empty() && this->cl_close_requests.empty()) {
mstime_t deadline = current_time + timeout;
struct timespec ts;
ts.tv_sec = deadline / 1000ULL;
ts.tv_nsec = (deadline % 1000ULL) * 1000 * 1000;
log_trace("no requests in progress, waiting %d ms for new ones",
timeout);
pthread_cond_timedwait(&this->cl_cond, &this->cl_mutex, &ts);
}
}
this->perform_io();
this->check_for_finished_requests();
this->check_for_new_requests();
this->requeue_requests(current_time + 5);
}
void curl_looper::perform_io()
{
if (this->cl_handle_to_request.empty()) {
return;
}
mstime_t current_time = getmstime();
int timeout = this->compute_timeout(current_time);
int running_handles;
curl_multi_wait(this->cl_curl_multi,
NULL,
0,
timeout,
NULL);
curl_multi_perform(this->cl_curl_multi, &running_handles);
}
void curl_looper::requeue_requests(mstime_t up_to_time)
{
while (!this->cl_poll_queue.empty() &&
this->cl_poll_queue.front().first <= up_to_time) {
curl_request *cr = this->cl_poll_queue.front().second;
log_debug("%s:polling request is ready again -- %p",
cr->get_name().c_str(), cr);
this->cl_handle_to_request[cr->get_handle()] = cr;
curl_multi_add_handle(this->cl_curl_multi, cr->get_handle());
this->cl_poll_queue.erase(this->cl_poll_queue.begin());
}
}
void curl_looper::check_for_new_requests() {
mutex_guard mg(this->cl_mutex);
while (!this->cl_new_requests.empty()) {
curl_request *cr = this->cl_new_requests.back();
log_info("%s:new curl request %p",
cr->get_name().c_str(),
cr);
this->cl_handle_to_request[cr->get_handle()] = cr;
curl_multi_add_handle(this->cl_curl_multi, cr->get_handle());
this->cl_new_requests.pop_back();
}
while (!this->cl_close_requests.empty()) {
const std::string &name = this->cl_close_requests.back();
vector<curl_request *>::iterator all_iter = find_if(
this->cl_all_requests.begin(),
this->cl_all_requests.end(),
curl_request_eq(name));
log_info("attempting to close request -- %s", name.c_str());
if (all_iter != this->cl_all_requests.end()) {
map<CURL *, curl_request *>::iterator act_iter;
vector<pair<mstime_t, curl_request *> >::iterator poll_iter;
curl_request *cr = *all_iter;
log_info("%s:closing request -- %p",
cr->get_name().c_str(), cr);
(*all_iter)->close();
act_iter = this->cl_handle_to_request.find(cr);
if (act_iter != this->cl_handle_to_request.end()) {
this->cl_handle_to_request.erase(act_iter);
curl_multi_remove_handle(this->cl_curl_multi,
cr->get_handle());
delete cr;
}
poll_iter = find_if(this->cl_poll_queue.begin(),
this->cl_poll_queue.end(),
curl_request_eq(name));
if (poll_iter != this->cl_poll_queue.end()) {
this->cl_poll_queue.erase(poll_iter);
}
this->cl_all_requests.erase(all_iter);
}
else {
log_error("Unable to find request with the name -- %s",
name.c_str());
}
this->cl_close_requests.pop_back();
pthread_cond_broadcast(&this->cl_cond);
}
}
void curl_looper::check_for_finished_requests()
{
CURLMsg *msg;
int msgs_left;
while ((msg = curl_multi_info_read(this->cl_curl_multi, &msgs_left)) != NULL) {
if (msg->msg != CURLMSG_DONE) {
continue;
}
CURL *easy = msg->easy_handle;
map<CURL *, curl_request *>::iterator iter = this->cl_handle_to_request.find(easy);
curl_multi_remove_handle(this->cl_curl_multi, easy);
if (iter != this->cl_handle_to_request.end()) {
curl_request *cr = iter->second;
long delay_ms;
this->cl_handle_to_request.erase(iter);
delay_ms = cr->complete(msg->data.result);
if (delay_ms < 0) {
vector<curl_request *>::iterator all_iter;
log_info("%s:curl_request %p finished, deleting...",
cr->get_name().c_str(), cr);
{
mutex_guard mg(this->cl_mutex);
all_iter = find(this->cl_all_requests.begin(),
this->cl_all_requests.end(),
cr);
if (all_iter != this->cl_all_requests.end()) {
this->cl_all_requests.erase(all_iter);
}
pthread_cond_broadcast(&cl_cond);
}
delete cr;
}
else {
log_debug("%s:curl_request %p is polling, requeueing in %d",
cr->get_name().c_str(),
cr,
delay_ms);
this->cl_poll_queue.push_back(
make_pair(getmstime() + delay_ms, cr));
sort(this->cl_poll_queue.begin(), this->cl_poll_queue.end());
}
}
}
}
#endif

@ -0,0 +1,240 @@
/**
* Copyright (c) 2015, Timothy Stack
*
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
*
* * Redistributions of source code must retain the above copyright notice, this
* list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above copyright notice,
* this list of conditions and the following disclaimer in the documentation
* and/or other materials provided with the distribution.
* * Neither the name of Timothy Stack nor the names of its contributors
* may be used to endorse or promote products derived from this software
* without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE REGENTS AND CONTRIBUTORS ''AS IS'' AND ANY
* EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
* WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
* DISCLAIMED. IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE FOR ANY
* DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
* (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
* LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON
* ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
* SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*
* @file curl_looper.hh
*/
#ifndef curl_looper_hh
#define curl_looper_hh
#include <map>
#include <string>
#include <vector>
#ifndef HAVE_LIBCURL
typedef int CURLcode;
class curl_request {
public:
curl_request(const std::string &name) {
};
};
class curl_looper {
public:
void start() { };
void add_request(curl_request *cr) { };
void close_request(const std::string &name) { };
void process_all() { };
};
#else
#include <curl/curl.h>
#include "auto_mem.hh"
#include "lnav_log.hh"
#include "lnav_util.hh"
#include "pthreadpp.hh"
class curl_request {
public:
curl_request(const std::string &name)
: cr_name(name),
cr_open(true),
cr_handle(curl_easy_cleanup),
cr_completions(0) {
this->cr_handle.reset(curl_easy_init());
curl_easy_setopt(this->cr_handle, CURLOPT_NOSIGNAL, 1);
curl_easy_setopt(this->cr_handle, CURLOPT_ERRORBUFFER, this->cr_error_buffer);
curl_easy_setopt(this->cr_handle, CURLOPT_DEBUGFUNCTION, debug_cb);
curl_easy_setopt(this->cr_handle, CURLOPT_DEBUGDATA, this);
curl_easy_setopt(this->cr_handle, CURLOPT_VERBOSE, 1);
};
virtual ~curl_request() {
};
const std::string &get_name() const {
return this->cr_name;
};
virtual void close() {
this->cr_open = false;
};
bool is_open() {
return this->cr_open;
};
CURL *get_handle() const {
return this->cr_handle;
};
int get_completions() const {
return this->cr_completions;
};
virtual long complete(CURLcode result) {
double total_time = 0, download_size = 0, download_speed = 0;
this->cr_completions += 1;
curl_easy_getinfo(this->cr_handle, CURLINFO_TOTAL_TIME, &total_time);
log_debug("%s: total_time=%f", this->cr_name.c_str(), total_time);
curl_easy_getinfo(this->cr_handle, CURLINFO_SIZE_DOWNLOAD, &download_size);
log_debug("%s: download_size=%f", this->cr_name.c_str(), download_size);
curl_easy_getinfo(this->cr_handle, CURLINFO_SPEED_DOWNLOAD, &download_speed);
log_debug("%s: download_speed=%f", this->cr_name.c_str(), download_speed);
return -1;
};
protected:
static int debug_cb(CURL *handle,
curl_infotype type,
char *data,
size_t size,
void *userp);
const std::string cr_name;
bool cr_open;
auto_mem<CURL> cr_handle;
char cr_error_buffer[CURL_ERROR_SIZE];
int cr_completions;
};
class curl_looper {
public:
curl_looper()
: cl_started(false),
cl_looping(true),
cl_curl_multi(curl_multi_cleanup) {
this->cl_curl_multi.reset(curl_multi_init());
pthread_mutex_init(&this->cl_mutex, NULL);
pthread_cond_init(&this->cl_cond, NULL);
};
~curl_looper() {
this->stop();
pthread_cond_destroy(&this->cl_cond);
pthread_mutex_destroy(&this->cl_mutex);
}
void start() {
if (pthread_create(&this->cl_thread, NULL, trampoline, this) == 0) {
this->cl_started = true;
}
};
void stop() {
if (this->cl_started) {
void *result;
this->cl_looping = false;
{
mutex_guard mg(this->cl_mutex);
pthread_cond_broadcast(&this->cl_cond);
}
log_debug("waiting for curl_looper thread");
pthread_join(this->cl_thread, &result);
log_debug("curl_looper thread joined");
}
};
void process_all() {
this->check_for_new_requests();
this->requeue_requests(LONG_MAX);
while (!this->cl_handle_to_request.empty()) {
this->perform_io();
this->check_for_finished_requests();
}
};
void add_request(curl_request *cr) {
mutex_guard mg(this->cl_mutex);
require(cr != NULL);
this->cl_all_requests.push_back(cr);
this->cl_new_requests.push_back(cr);
pthread_cond_broadcast(&this->cl_cond);
};
void close_request(const std::string &name) {
mutex_guard mg(this->cl_mutex);
this->cl_close_requests.push_back(name);
pthread_cond_broadcast(&this->cl_cond);
};
private:
void *run();
void loop_body();
void perform_io();
void check_for_new_requests();
void check_for_finished_requests();
void requeue_requests(mstime_t up_to_time);
int compute_timeout(mstime_t current_time) const {
int retval = 1000;
if (!this->cl_poll_queue.empty()) {
retval = std::max(
1LL, this->cl_poll_queue.front().first - current_time);
}
ensure(retval > 0);
return retval;
};
static void *trampoline(void *arg);
bool cl_started;
pthread_t cl_thread;
volatile bool cl_looping;
auto_mem<CURLM> cl_curl_multi;
pthread_mutex_t cl_mutex;
pthread_cond_t cl_cond;
std::vector<curl_request *> cl_all_requests;
std::vector<curl_request *> cl_new_requests;
std::vector<std::string> cl_close_requests;
std::map<CURL *, curl_request *> cl_handle_to_request;
std::vector<std::pair<mstime_t, curl_request *> > cl_poll_queue;
};
#endif
#endif

@ -131,6 +131,7 @@
#include "hotkeys.hh" #include "hotkeys.hh"
#include "readline_possibilities.hh" #include "readline_possibilities.hh"
#include "field_overlay_source.hh" #include "field_overlay_source.hh"
#include "url_loader.hh"
using namespace std; using namespace std;
@ -1236,7 +1237,10 @@ static void expand_filename(string path, bool required)
{ {
static_root_mem<glob_t, globfree> gl; static_root_mem<glob_t, globfree> gl;
if (glob(path.c_str(), GLOB_NOCHECK, NULL, gl.inout()) == 0) { if (is_url(path.c_str())) {
return;
}
else if (glob(path.c_str(), GLOB_NOCHECK, NULL, gl.inout()) == 0) {
int lpc; int lpc;
if (gl->gl_pathc == 1 /*&& gl.gl_matchc == 0*/) { if (gl->gl_pathc == 1 /*&& gl.gl_matchc == 0*/) {
@ -2272,6 +2276,7 @@ int main(int argc, char *argv[])
case 'd': case 'd':
lnav_data.ld_debug_log_name = optarg; lnav_data.ld_debug_log_name = optarg;
lnav_log_level = LOG_LEVEL_TRACE;
break; break;
case 'a': case 'a':
@ -2561,6 +2566,14 @@ int main(int argc, char *argv[])
if (startswith(argv[lpc], "pt:")) { if (startswith(argv[lpc], "pt:")) {
lnav_data.ld_pt_search = argv[lpc]; lnav_data.ld_pt_search = argv[lpc];
} }
#ifdef HAVE_LIBCURL
else if (is_url(argv[lpc])) {
auto_ptr<url_loader> ul(new url_loader(argv[lpc]));
lnav_data.ld_file_names.insert(make_pair(argv[lpc], ul->copy_fd().release()));
lnav_data.ld_curl_looper.add_request(ul.release());
}
#endif
else if (is_glob(argv[lpc])) { else if (is_glob(argv[lpc])) {
lnav_data.ld_file_names.insert(make_pair(argv[lpc], -1)); lnav_data.ld_file_names.insert(make_pair(argv[lpc], -1));
} }
@ -2740,6 +2753,7 @@ int main(int argc, char *argv[])
log_info("Executing initial commands"); log_info("Executing initial commands");
execute_init_commands(msgs); execute_init_commands(msgs);
wait_for_pipers(); wait_for_pipers();
lnav_data.ld_curl_looper.process_all();
rebuild_indexes(false); rebuild_indexes(false);
for (msg_iter = msgs.begin(); for (msg_iter = msgs.begin();
@ -2799,6 +2813,8 @@ int main(int argc, char *argv[])
} }
} }
else { else {
lnav_data.ld_curl_looper.start();
init_session(); init_session();
log_info(" session_id=%s", lnav_data.ld_session_id.c_str()); log_info(" session_id=%s", lnav_data.ld_session_id.c_str());

@ -59,6 +59,7 @@
#include "piper_proc.hh" #include "piper_proc.hh"
#include "term_extra.hh" #include "term_extra.hh"
#include "ansi_scrubber.hh" #include "ansi_scrubber.hh"
#include "curl_looper.hh"
#include "papertrail_proc.hh" #include "papertrail_proc.hh"
/** The command modes that are available while viewing a file. */ /** The command modes that are available while viewing a file. */
@ -241,6 +242,8 @@ struct _lnav_data {
term_extra ld_term_extra; term_extra ld_term_extra;
input_state_tracker ld_input_state; input_state_tracker ld_input_state;
curl_looper ld_curl_looper;
}; };
extern struct _lnav_data lnav_data; extern struct _lnav_data lnav_data;

@ -47,6 +47,7 @@
#include "lnav_commands.hh" #include "lnav_commands.hh"
#include "session_data.hh" #include "session_data.hh"
#include "command_executor.hh" #include "command_executor.hh"
#include "url_loader.hh"
using namespace std; using namespace std;
@ -1187,7 +1188,20 @@ static string com_open(string cmdline, vector<string> &args)
auto_mem<char> abspath; auto_mem<char> abspath;
struct stat st; struct stat st;
if (is_glob(fn.c_str())) { if (is_url(fn.c_str())) {
#ifndef HAVE_LIBCURL
retval = "error: lnav was not compiled with libcurl";
#else
auto_ptr<url_loader> ul(new url_loader(fn));
lnav_data.ld_file_names.insert(make_pair(fn, ul->copy_fd().release()));
lnav_data.ld_curl_looper.add_request(ul.release());
lnav_data.ld_files_to_front.push_back(make_pair(fn, top));
retval = "info: opened URL";
#endif
}
else if (is_glob(fn.c_str())) {
lnav_data.ld_file_names.insert(make_pair(fn, -1)); lnav_data.ld_file_names.insert(make_pair(fn, -1));
retval = "info: watching -- " + fn; retval = "info: watching -- " + fn;
} }
@ -1273,6 +1287,9 @@ static string com_close(string cmdline, vector<string> &args)
} }
} }
if (!fn.empty()) { if (!fn.empty()) {
if (is_url(fn.c_str())) {
lnav_data.ld_curl_looper.close_request(fn);
}
lnav_data.ld_file_names.erase(make_pair(fn, -1)); lnav_data.ld_file_names.erase(make_pair(fn, -1));
lnav_data.ld_closed_files.insert(fn); lnav_data.ld_closed_files.insert(fn);
retval = "info: closed -- " + fn; retval = "info: closed -- " + fn;
@ -1714,6 +1731,18 @@ static string com_shexec(string cmdline, vector<string> &args)
return ""; return "";
} }
static string com_poll_now(string cmdline, vector<string> &args)
{
if (args.empty()) {
}
else {
lnav_data.ld_curl_looper.process_all();
}
return "";
}
static string com_redraw(string cmdline, vector<string> &args) static string com_redraw(string cmdline, vector<string> &args)
{ {
if (args.empty()) { if (args.empty()) {
@ -1776,5 +1805,6 @@ void init_lnav_commands(readline_context::command_map_t &cmd_map)
if (getenv("lnav_test") != NULL) { if (getenv("lnav_test") != NULL) {
cmd_map["rebuild"] = com_rebuild; cmd_map["rebuild"] = com_rebuild;
cmd_map["shexec"] = com_shexec; cmd_map["shexec"] = com_shexec;
cmd_map["poll-now"] = com_poll_now;
} }
} }

@ -42,6 +42,7 @@
#include <unistd.h> #include <unistd.h>
#include <signal.h> #include <signal.h>
#include <libgen.h> #include <libgen.h>
#include <pthread.h>
#ifdef HAVE_EXECINFO_H #ifdef HAVE_EXECINFO_H
#include <execinfo.h> #include <execinfo.h>
@ -68,6 +69,7 @@
#endif #endif
#include "lnav_log.hh" #include "lnav_log.hh"
#include "pthreadpp.hh"
static const size_t BUFFER_SIZE = 256 * 1024; static const size_t BUFFER_SIZE = 256 * 1024;
static const size_t MAX_LOG_LINE_SIZE = 2048; static const size_t MAX_LOG_LINE_SIZE = 2048;
@ -83,9 +85,10 @@ static const char *CRASH_MSG =
"=========================\n"; "=========================\n";
FILE *lnav_log_file; FILE *lnav_log_file;
lnav_log_level_t lnav_log_level; lnav_log_level_t lnav_log_level = LOG_LEVEL_DEBUG;
const char *lnav_log_crash_dir; const char *lnav_log_crash_dir;
const struct termios *lnav_log_orig_termios; const struct termios *lnav_log_orig_termios;
static pthread_mutex_t lnav_log_mutex = PTHREAD_MUTEX_INITIALIZER;
log_state_dumper::log_state_list log_state_dumper::DUMPER_LIST; log_state_dumper::log_state_list log_state_dumper::DUMPER_LIST;
@ -101,6 +104,7 @@ static struct {
}; };
static const char *LEVEL_NAMES[] = { static const char *LEVEL_NAMES[] = {
"T",
"D", "D",
"I", "I",
"W", "W",
@ -196,6 +200,8 @@ void log_msg(lnav_log_level_t level, const char *src_file, int line_number,
return; return;
} }
mutex_guard mg(lnav_log_mutex);
va_start(args, fmt); va_start(args, fmt);
gettimeofday(&curr_time, NULL); gettimeofday(&curr_time, NULL);
localtime_r(&curr_time.tv_sec, &localtm); localtime_r(&curr_time.tv_sec, &localtm);
@ -233,6 +239,8 @@ void log_msg_extra(const char *fmt, ...)
ssize_t rc; ssize_t rc;
char *line; char *line;
mutex_guard mg(lnav_log_mutex);
va_start(args, fmt); va_start(args, fmt);
line = log_alloc(); line = log_alloc();
rc = vsnprintf(line, MAX_LOG_LINE_SIZE - 1, fmt, args); rc = vsnprintf(line, MAX_LOG_LINE_SIZE - 1, fmt, args);
@ -248,6 +256,8 @@ void log_msg_extra_complete()
{ {
char *line; char *line;
mutex_guard mg(lnav_log_mutex);
line = log_alloc(); line = log_alloc();
line[0] = '\n'; line[0] = '\n';
log_ring.lr_length += 1; log_ring.lr_length += 1;

@ -46,6 +46,7 @@
#endif #endif
enum lnav_log_level_t { enum lnav_log_level_t {
LOG_LEVEL_TRACE,
LOG_LEVEL_DEBUG, LOG_LEVEL_DEBUG,
LOG_LEVEL_INFO, LOG_LEVEL_INFO,
LOG_LEVEL_WARNING, LOG_LEVEL_WARNING,
@ -113,6 +114,9 @@ extern enum lnav_log_level_t lnav_log_level;
#define log_debug(fmt...) \ #define log_debug(fmt...) \
log_msg_wrapper(LOG_LEVEL_DEBUG, fmt); log_msg_wrapper(LOG_LEVEL_DEBUG, fmt);
#define log_trace(fmt...) \
log_msg_wrapper(LOG_LEVEL_TRACE, fmt);
#define require(e) \ #define require(e) \
((void) ((e) ? 0 : __require (#e, __FILE__, __LINE__))) ((void) ((e) ? 0 : __require (#e, __FILE__, __LINE__)))
#define __require(e, file, line) \ #define __require(e, file, line) \

@ -41,9 +41,20 @@
#include "auto_fd.hh" #include "auto_fd.hh"
#include "lnav_util.hh" #include "lnav_util.hh"
#include "pcrepp.hh"
using namespace std; using namespace std;
bool is_url(const char *fn)
{
static pcrepp url_re("^(file|https?|ftps?||scp|sftp):");
pcre_context_static<30> pc;
pcre_input pi(fn);
return url_re.match(pc, pi);
}
std::string hash_string(const std::string &str) std::string hash_string(const std::string &str)
{ {
byte_array<2, uint64> hash; byte_array<2, uint64> hash;

@ -36,6 +36,7 @@
#include <math.h> #include <math.h>
#include <time.h> #include <time.h>
#include <sys/time.h>
#include <poll.h> #include <poll.h>
#include <sys/types.h> #include <sys/types.h>
@ -107,6 +108,16 @@ inline time_t hour_num(time_t ti)
std::string time_ago(time_t last_time); std::string time_ago(time_t last_time);
typedef int64_t mstime_t;
inline mstime_t getmstime() {
struct timeval tv;
gettimeofday(&tv, NULL);
return tv.tv_sec * 1000ULL + tv.tv_usec / 1000ULL;
}
#if SIZEOF_OFF_T == 8 #if SIZEOF_OFF_T == 8
#define FORMAT_OFF_T "%qd" #define FORMAT_OFF_T "%qd"
#elif SIZEOF_OFF_T == 4 #elif SIZEOF_OFF_T == 4
@ -173,6 +184,8 @@ inline bool is_glob(const char *fn)
strchr(fn, '[') != NULL); strchr(fn, '[') != NULL);
}; };
bool is_url(const char *fn);
inline bool startswith(const char *str, const char *prefix) inline bool startswith(const char *str, const char *prefix)
{ {
return strncmp(str, prefix, strlen(prefix)) == 0; return strncmp(str, prefix, strlen(prefix)) == 0;

@ -0,0 +1,51 @@
/**
* Copyright (c) 2015, Timothy Stack
*
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
*
* * Redistributions of source code must retain the above copyright notice, this
* list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above copyright notice,
* this list of conditions and the following disclaimer in the documentation
* and/or other materials provided with the distribution.
* * Neither the name of Timothy Stack nor the names of its contributors
* may be used to endorse or promote products derived from this software
* without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE REGENTS AND CONTRIBUTORS ''AS IS'' AND ANY
* EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
* WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
* DISCLAIMED. IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE FOR ANY
* DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
* (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
* LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON
* ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
* SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*
* @file pthreadpp.hh
*/
#ifndef pthreadpp_hh
#define pthreadpp_hh
#include <pthread.h>
class mutex_guard {
public:
mutex_guard(pthread_mutex_t &mutex) : mg_mutex(mutex) {
pthread_mutex_lock(&mutex);
};
~mutex_guard() {
pthread_mutex_unlock(&this->mg_mutex);
};
private:
pthread_mutex_t &mg_mutex;
};
#endif

@ -0,0 +1,143 @@
/**
* Copyright (c) 2015, Timothy Stack
*
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
*
* * Redistributions of source code must retain the above copyright notice, this
* list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above copyright notice,
* this list of conditions and the following disclaimer in the documentation
* and/or other materials provided with the distribution.
* * Neither the name of Timothy Stack nor the names of its contributors
* may be used to endorse or promote products derived from this software
* without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE REGENTS AND CONTRIBUTORS ''AS IS'' AND ANY
* EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
* WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
* DISCLAIMED. IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE FOR ANY
* DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
* (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
* LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON
* ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
* SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/
#ifndef url_loader_hh
#define url_loader_hh
#ifdef HAVE_LIBCURL
#include <curl/curl.h>
class url_loader : public curl_request {
public:
url_loader(const std::string &url) : curl_request(url), ul_resume_offset(0) {
char piper_tmpname[PATH_MAX];
const char *tmpdir;
if ((tmpdir = getenv("TMPDIR")) == NULL) {
tmpdir = _PATH_VARTMP;
}
snprintf(piper_tmpname, sizeof(piper_tmpname),
"%s/lnav.url.XXXXXX",
tmpdir);
if ((this->ul_fd = mkstemp(piper_tmpname)) == -1) {
return;
}
unlink(piper_tmpname);
curl_easy_setopt(this->cr_handle, CURLOPT_URL, this->cr_name.c_str());
curl_easy_setopt(this->cr_handle, CURLOPT_WRITEFUNCTION, write_cb);
curl_easy_setopt(this->cr_handle, CURLOPT_WRITEDATA, this);
curl_easy_setopt(this->cr_handle, CURLOPT_FILETIME, 1);
};
int get_fd() const {
return this->ul_fd.get();
};
auto_fd copy_fd() const {
return this->ul_fd;
};
long complete(CURLcode result) {
curl_request::complete(result);
switch (result) {
case CURLE_OK:
break;
case CURLE_BAD_DOWNLOAD_RESUME:
break;
default:
log_error("%s:curl failure -- %ld %s",
this->cr_name.c_str(), result, curl_easy_strerror(result));
write(this->ul_fd, this->cr_error_buffer, strlen(this->cr_error_buffer));
return -1;
}
long file_time;
CURLcode rc;
rc = curl_easy_getinfo(this->cr_handle, CURLINFO_FILETIME, &file_time);
if (rc == CURLE_OK) {
time_t current_time;
time(&current_time);
if (file_time == -1 ||
(current_time - file_time) < FOLLOW_IF_MODIFIED_SINCE) {
char range[64];
struct stat st;
off_t start;
fstat(this->ul_fd, &st);
if (st.st_size > 0) {
start = st.st_size - 1;
this->ul_resume_offset = 1;
}
else {
start = 0;
this->ul_resume_offset = 0;
}
snprintf(range, sizeof(range), "%ld-", (long) start);
curl_easy_setopt(this->cr_handle, CURLOPT_RANGE, range);
return 2000;
}
else {
log_debug("URL was not recently modified, not tailing: %s",
this->cr_name.c_str());
}
}
else {
log_error("Could not get file time for URL: %s -- %s",
this->cr_name.c_str(), curl_easy_strerror(rc));
}
return -1;
};
private:
static const long FOLLOW_IF_MODIFIED_SINCE = 60 * 60;
static ssize_t write_cb(void *contents, size_t size, size_t nmemb, void *userp) {
url_loader *ul = (url_loader *) userp;
char *c_contents = (char *) contents;
ssize_t retval;
c_contents += ul->ul_resume_offset;
retval = write(ul->ul_fd, c_contents, (size * nmemb) - ul->ul_resume_offset);
retval += ul->ul_resume_offset;
ul->ul_resume_offset = 0;
return retval;
}
auto_fd ul_fd;
off_t ul_resume_offset;
};
#endif
#endif

@ -158,6 +158,7 @@ drive_sql_LDADD = \
$(PCRE_LIBS) \ $(PCRE_LIBS) \
$(CURSES_LIB) \ $(CURSES_LIB) \
$(READLINE_LIBS) \ $(READLINE_LIBS) \
$(LIBCURL) \
-lpcrecpp -lpcrecpp
slicer_SOURCES = slicer.cc slicer_SOURCES = slicer.cc
@ -169,6 +170,7 @@ scripty_LDADD = ../src/libdiag.a
dist_noinst_SCRIPTS = \ dist_noinst_SCRIPTS = \
parser_debugger.py \ parser_debugger.py \
test_cmds.sh \ test_cmds.sh \
test_curl.sh \
test_data_parser.sh \ test_data_parser.sh \
test_format_loader.sh \ test_format_loader.sh \
test_grep_proc.sh \ test_grep_proc.sh \
@ -296,6 +298,11 @@ TESTS = \
test_yajlpp \ test_yajlpp \
test_pretty_print.sh test_pretty_print.sh
if HAVE_LIBCURL
TESTS += \
test_curl.sh
endif
DISTCLEANFILES = \ DISTCLEANFILES = \
*.dat \ *.dat \
*.err \ *.err \

@ -0,0 +1,48 @@
#! /bin/bash
if test x"$SFTP_TEST_URL" == x""; then
exit 0
fi
run_test ${lnav_test} -n \
file://${test_dir}/logfile_access_log.0
check_output "file URL is not working" <<EOF
192.168.202.254 - - [20/Jul/2009:22:59:26 +0000] "GET /vmw/cgi/tramp HTTP/1.0" 200 134 "-" "gPXE/0.9.7"
192.168.202.254 - - [20/Jul/2009:22:59:29 +0000] "GET /vmw/vSphere/default/vmkboot.gz HTTP/1.0" 404 46210 "-" "gPXE/0.9.7"
192.168.202.254 - - [20/Jul/2009:22:59:29 +0000] "GET /vmw/vSphere/default/vmkernel.gz HTTP/1.0" 200 78929 "-" "gPXE/0.9.7"
EOF
cp ${test_dir}/logfile_access_log.0 curl_access_log.0
run_test ${lnav_test} -n \
$SFTP_TEST_URL/`pwd`/curl_access_log.0
check_output "sftp URL is not working" <<EOF
192.168.202.254 - - [20/Jul/2009:22:59:26 +0000] "GET /vmw/cgi/tramp HTTP/1.0" 200 134 "-" "gPXE/0.9.7"
192.168.202.254 - - [20/Jul/2009:22:59:29 +0000] "GET /vmw/vSphere/default/vmkboot.gz HTTP/1.0" 404 46210 "-" "gPXE/0.9.7"
192.168.202.254 - - [20/Jul/2009:22:59:29 +0000] "GET /vmw/vSphere/default/vmkernel.gz HTTP/1.0" 200 78929 "-" "gPXE/0.9.7"
EOF
run_test ${lnav_test} -n \
-c ":poll-now" \
-c ":shexec echo foo >> curl_access_log.0" \
$SFTP_TEST_URL/`pwd`/curl_access_log.0
check_output "sftp URL is not working" <<EOF
192.168.202.254 - - [20/Jul/2009:22:59:26 +0000] "GET /vmw/cgi/tramp HTTP/1.0" 200 134 "-" "gPXE/0.9.7"
192.168.202.254 - - [20/Jul/2009:22:59:29 +0000] "GET /vmw/vSphere/default/vmkboot.gz HTTP/1.0" 404 46210 "-" "gPXE/0.9.7"
192.168.202.254 - - [20/Jul/2009:22:59:29 +0000] "GET /vmw/vSphere/default/vmkernel.gz HTTP/1.0" 200 78929 "-" "gPXE/0.9.7"
foo
EOF
run_test ${lnav_test} -n \
-c ":open $SFTP_TEST_URL/`pwd`/curl_access_log.0" \
${test_dir}/logfile_empty.0
check_output "sftp URL is not working" <<EOF
192.168.202.254 - - [20/Jul/2009:22:59:26 +0000] "GET /vmw/cgi/tramp HTTP/1.0" 200 134 "-" "gPXE/0.9.7"
192.168.202.254 - - [20/Jul/2009:22:59:29 +0000] "GET /vmw/vSphere/default/vmkboot.gz HTTP/1.0" 404 46210 "-" "gPXE/0.9.7"
192.168.202.254 - - [20/Jul/2009:22:59:29 +0000] "GET /vmw/vSphere/default/vmkernel.gz HTTP/1.0" 200 78929 "-" "gPXE/0.9.7"
foo
EOF
Loading…
Cancel
Save