From f2869508544a355f715a77b21cad9ffc31a5ad48 Mon Sep 17 00:00:00 2001 From: Timothy Stack Date: Sun, 2 Aug 2015 06:43:57 -0700 Subject: [PATCH] [curl] add a curl looper to handle url requests --- NEWS | 1 + README | 1 + TESTS_ENVIRONMENT.in | 6 + configure.ac | 2 + src/CMakeLists.txt | 4 + src/Makefile.am | 4 + src/curl_looper.cc | 258 +++++++++++++++++++++++++++++++++++++++++++ src/curl_looper.hh | 240 ++++++++++++++++++++++++++++++++++++++++ src/lnav.cc | 18 ++- src/lnav.hh | 3 + src/lnav_commands.cc | 32 +++++- src/lnav_log.cc | 12 +- src/lnav_log.hh | 4 + src/lnav_util.cc | 11 ++ src/lnav_util.hh | 13 +++ src/pthreadpp.hh | 51 +++++++++ src/url_loader.hh | 143 ++++++++++++++++++++++++ test/Makefile.am | 7 ++ test/test_curl.sh | 48 ++++++++ 19 files changed, 855 insertions(+), 3 deletions(-) create mode 100644 src/curl_looper.cc create mode 100644 src/curl_looper.hh create mode 100644 src/pthreadpp.hh create mode 100644 src/url_loader.hh create mode 100644 test/test_curl.sh diff --git a/NEWS b/NEWS index bdf8e6ff..4acacec9 100644 --- a/NEWS +++ b/NEWS @@ -4,6 +4,7 @@ lnav v0.8.0: * Integration with "papertrailapp.com" for querying and tailing server log and syslog messages. See the Papertrail section in the online help for more details. + * Remote files can be opened when linked with libcurl. * Log formats that are "containers" for other log formats, like syslog, are now supported. See the online help for more information. diff --git a/README b/README index 1d03366e..dcca9dbc 100644 --- a/README +++ b/README @@ -24,6 +24,7 @@ Lnav requires the following software packages: zlib - The zlib compression library. bz2 - The bzip2 compression library. re2c - The re2c scanner generator. + libcurl - The cURL library for downloading files from URLs. INSTALLATION diff --git a/TESTS_ENVIRONMENT.in b/TESTS_ENVIRONMENT.in index 5c2a8057..31bc805a 100644 --- a/TESTS_ENVIRONMENT.in +++ b/TESTS_ENVIRONMENT.in @@ -33,9 +33,15 @@ test_num=0 lnav_test="${top_builddir}/src/lnav-test" export lnav_test +lnav="${top_builddir}/src/lnav" +export lnav + LNAV_LOG_PATH="${top_builddir}/test/test.log" export LNAV_LOG_PATH +SFTP_TEST_URL="@SFTP_TEST_URL@" +export SFTP_TEST_URL + ## BEGIN Functions LAST_TEST="" diff --git a/configure.ac b/configure.ac index 7c81472a..97977955 100644 --- a/configure.ac +++ b/configure.ac @@ -44,6 +44,8 @@ CPPFLAGS="$CPPFLAGS -D_ISOC99_SOURCE -D__STDC_LIMIT_MACROS" # CFLAGS=`echo $CFLAGS | sed 's/-O2//g'` # CXXFLAGS=`echo $CXXFLAGS | sed 's/-O2//g'` +AC_ARG_VAR(SFTP_TEST_URL) + AC_ARG_ENABLE([profiling], AS_HELP_STRING([--enable-profiling], [Compile with gprof(1) profiling support])) diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index e7fb336e..85352dec 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -4,6 +4,7 @@ set(diag_STAT_SRCS bookmarks.cc collation-functions.cc command_executor.cc + curl_looper.cc db_sub_source.cc environ_vtab.cc extension-functions.c @@ -81,6 +82,7 @@ set(diag_STAT_SRCS command_executor.hh concise_index.hh column_namer.hh + curl_looper.hh field_overlay_source.hh filter_observer.hh format-text-files.hh @@ -98,6 +100,7 @@ set(diag_STAT_SRCS plain_text_source.hh pretty_printer.hh ptimec.hh + pthreadpp.hh readline_callbacks.hh readline_possibilities.hh sequence_sink.hh @@ -109,6 +112,7 @@ set(diag_STAT_SRCS textfile_sub_source.hh time_T.hh top_status_source.hh + url_loader.hh views_vtab.hh yajl/api/yajl_common.h diff --git a/src/Makefile.am b/src/Makefile.am index ddeee2cb..f07b30b0 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -102,6 +102,7 @@ noinst_HEADERS = \ column_namer.hh \ command_executor.hh \ concise_index.hh \ + curl_looper.hh \ data_scanner.hh \ data_scanner_re.re \ data_parser.hh \ @@ -143,6 +144,7 @@ noinst_HEADERS = \ piper_proc.hh \ plain_text_source.hh \ pretty_printer.hh \ + pthreadpp.hh \ ptimec.hh \ readline_callbacks.hh \ readline_curses.hh \ @@ -166,6 +168,7 @@ noinst_HEADERS = \ time_T.hh \ timer.hh \ top_status_source.hh \ + url_loader.hh \ view_curses.hh \ views_vtab.hh \ vt52_curses.hh \ @@ -196,6 +199,7 @@ libdiag_a_SOURCES = \ bookmarks.cc \ collation-functions.cc \ command_executor.cc \ + curl_looper.cc \ db_sub_source.cc \ environ_vtab.cc \ extension-functions.c \ diff --git a/src/curl_looper.cc b/src/curl_looper.cc new file mode 100644 index 00000000..d44f8a5d --- /dev/null +++ b/src/curl_looper.cc @@ -0,0 +1,258 @@ +/** + * Copyright (c) 2015, Timothy Stack + * + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * * Redistributions of source code must retain the above copyright notice, this + * list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above copyright notice, + * this list of conditions and the following disclaimer in the documentation + * and/or other materials provided with the distribution. + * * Neither the name of Timothy Stack nor the names of its contributors + * may be used to endorse or promote products derived from this software + * without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE REGENTS AND CONTRIBUTORS ''AS IS'' AND ANY + * EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED + * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE + * DISCLAIMED. IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE FOR ANY + * DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES + * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; + * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON + * ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS + * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + * @file curl_looper.cc + */ + +#include "config.h" + +#ifdef HAVE_LIBCURL +#include + +#include "curl_looper.hh" + +using namespace std; + +struct curl_request_eq { + curl_request_eq(const std::string &name) : cre_name(name) { + }; + + bool operator()(const curl_request *cr) const { + return this->cre_name == cr->get_name(); + }; + + bool operator()(const pair &pair) const { + return this->cre_name == pair.second->get_name(); + }; + + const std::string &cre_name; +}; + +int curl_request::debug_cb(CURL *handle, + curl_infotype type, + char *data, + size_t size, + void *userp) { + curl_request *cr = (curl_request *) userp; + + if (type == CURLINFO_TEXT) { + while (size > 0 && isspace(data[size - 1])) { + size -= 1; + } + log_debug("%s:%.*s", cr->get_name().c_str(), size, data); + } + + return 0; +} + +void *curl_looper::trampoline(void *arg) +{ + curl_looper *cl = (curl_looper *) arg; + + return cl->run(); +} + +void *curl_looper::run() +{ + log_info("curl looper thread started"); + while (this->cl_looping) { + this->loop_body(); + } + log_info("curl looper thread exiting"); + + return NULL; +} + +void curl_looper::loop_body() +{ + mstime_t current_time = getmstime(); + int timeout = this->compute_timeout(current_time); + + if (this->cl_handle_to_request.empty()) { + mutex_guard mg(this->cl_mutex); + + if (this->cl_new_requests.empty() && this->cl_close_requests.empty()) { + mstime_t deadline = current_time + timeout; + struct timespec ts; + + ts.tv_sec = deadline / 1000ULL; + ts.tv_nsec = (deadline % 1000ULL) * 1000 * 1000; + log_trace("no requests in progress, waiting %d ms for new ones", + timeout); + pthread_cond_timedwait(&this->cl_cond, &this->cl_mutex, &ts); + } + } + + this->perform_io(); + + this->check_for_finished_requests(); + + this->check_for_new_requests(); + + this->requeue_requests(current_time + 5); +} + +void curl_looper::perform_io() +{ + if (this->cl_handle_to_request.empty()) { + return; + } + + mstime_t current_time = getmstime(); + int timeout = this->compute_timeout(current_time); + int running_handles; + + curl_multi_wait(this->cl_curl_multi, + NULL, + 0, + timeout, + NULL); + curl_multi_perform(this->cl_curl_multi, &running_handles); +} + +void curl_looper::requeue_requests(mstime_t up_to_time) +{ + while (!this->cl_poll_queue.empty() && + this->cl_poll_queue.front().first <= up_to_time) { + curl_request *cr = this->cl_poll_queue.front().second; + + log_debug("%s:polling request is ready again -- %p", + cr->get_name().c_str(), cr); + this->cl_handle_to_request[cr->get_handle()] = cr; + curl_multi_add_handle(this->cl_curl_multi, cr->get_handle()); + this->cl_poll_queue.erase(this->cl_poll_queue.begin()); + } +} + +void curl_looper::check_for_new_requests() { + mutex_guard mg(this->cl_mutex); + + while (!this->cl_new_requests.empty()) { + curl_request *cr = this->cl_new_requests.back(); + + log_info("%s:new curl request %p", + cr->get_name().c_str(), + cr); + this->cl_handle_to_request[cr->get_handle()] = cr; + curl_multi_add_handle(this->cl_curl_multi, cr->get_handle()); + this->cl_new_requests.pop_back(); + } + while (!this->cl_close_requests.empty()) { + const std::string &name = this->cl_close_requests.back(); + vector::iterator all_iter = find_if( + this->cl_all_requests.begin(), + this->cl_all_requests.end(), + curl_request_eq(name)); + + log_info("attempting to close request -- %s", name.c_str()); + if (all_iter != this->cl_all_requests.end()) { + map::iterator act_iter; + vector >::iterator poll_iter; + curl_request *cr = *all_iter; + + log_info("%s:closing request -- %p", + cr->get_name().c_str(), cr); + (*all_iter)->close(); + act_iter = this->cl_handle_to_request.find(cr); + if (act_iter != this->cl_handle_to_request.end()) { + this->cl_handle_to_request.erase(act_iter); + curl_multi_remove_handle(this->cl_curl_multi, + cr->get_handle()); + delete cr; + } + poll_iter = find_if(this->cl_poll_queue.begin(), + this->cl_poll_queue.end(), + curl_request_eq(name)); + if (poll_iter != this->cl_poll_queue.end()) { + this->cl_poll_queue.erase(poll_iter); + } + this->cl_all_requests.erase(all_iter); + } + else { + log_error("Unable to find request with the name -- %s", + name.c_str()); + } + + this->cl_close_requests.pop_back(); + + pthread_cond_broadcast(&this->cl_cond); + } +} + +void curl_looper::check_for_finished_requests() +{ + CURLMsg *msg; + int msgs_left; + + while ((msg = curl_multi_info_read(this->cl_curl_multi, &msgs_left)) != NULL) { + if (msg->msg != CURLMSG_DONE) { + continue; + } + + CURL *easy = msg->easy_handle; + map::iterator iter = this->cl_handle_to_request.find(easy); + + curl_multi_remove_handle(this->cl_curl_multi, easy); + if (iter != this->cl_handle_to_request.end()) { + curl_request *cr = iter->second; + long delay_ms; + + this->cl_handle_to_request.erase(iter); + delay_ms = cr->complete(msg->data.result); + if (delay_ms < 0) { + vector::iterator all_iter; + + log_info("%s:curl_request %p finished, deleting...", + cr->get_name().c_str(), cr); + { + mutex_guard mg(this->cl_mutex); + + all_iter = find(this->cl_all_requests.begin(), + this->cl_all_requests.end(), + cr); + if (all_iter != this->cl_all_requests.end()) { + this->cl_all_requests.erase(all_iter); + } + pthread_cond_broadcast(&cl_cond); + } + delete cr; + } + else { + log_debug("%s:curl_request %p is polling, requeueing in %d", + cr->get_name().c_str(), + cr, + delay_ms); + this->cl_poll_queue.push_back( + make_pair(getmstime() + delay_ms, cr)); + sort(this->cl_poll_queue.begin(), this->cl_poll_queue.end()); + } + } + } +} + +#endif diff --git a/src/curl_looper.hh b/src/curl_looper.hh new file mode 100644 index 00000000..f3f3cfc0 --- /dev/null +++ b/src/curl_looper.hh @@ -0,0 +1,240 @@ +/** + * Copyright (c) 2015, Timothy Stack + * + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * * Redistributions of source code must retain the above copyright notice, this + * list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above copyright notice, + * this list of conditions and the following disclaimer in the documentation + * and/or other materials provided with the distribution. + * * Neither the name of Timothy Stack nor the names of its contributors + * may be used to endorse or promote products derived from this software + * without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE REGENTS AND CONTRIBUTORS ''AS IS'' AND ANY + * EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED + * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE + * DISCLAIMED. IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE FOR ANY + * DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES + * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; + * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON + * ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS + * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + * @file curl_looper.hh + */ + +#ifndef curl_looper_hh +#define curl_looper_hh + +#include +#include +#include + +#ifndef HAVE_LIBCURL + +typedef int CURLcode; + +class curl_request { +public: + curl_request(const std::string &name) { + }; +}; + +class curl_looper { +public: + void start() { }; + void add_request(curl_request *cr) { }; + void close_request(const std::string &name) { }; + void process_all() { }; +}; + +#else +#include + +#include "auto_mem.hh" +#include "lnav_log.hh" +#include "lnav_util.hh" +#include "pthreadpp.hh" + +class curl_request { +public: + curl_request(const std::string &name) + : cr_name(name), + cr_open(true), + cr_handle(curl_easy_cleanup), + cr_completions(0) { + this->cr_handle.reset(curl_easy_init()); + curl_easy_setopt(this->cr_handle, CURLOPT_NOSIGNAL, 1); + curl_easy_setopt(this->cr_handle, CURLOPT_ERRORBUFFER, this->cr_error_buffer); + curl_easy_setopt(this->cr_handle, CURLOPT_DEBUGFUNCTION, debug_cb); + curl_easy_setopt(this->cr_handle, CURLOPT_DEBUGDATA, this); + curl_easy_setopt(this->cr_handle, CURLOPT_VERBOSE, 1); + }; + + virtual ~curl_request() { + + }; + + const std::string &get_name() const { + return this->cr_name; + }; + + virtual void close() { + this->cr_open = false; + }; + + bool is_open() { + return this->cr_open; + }; + + CURL *get_handle() const { + return this->cr_handle; + }; + + int get_completions() const { + return this->cr_completions; + }; + + virtual long complete(CURLcode result) { + double total_time = 0, download_size = 0, download_speed = 0; + + this->cr_completions += 1; + curl_easy_getinfo(this->cr_handle, CURLINFO_TOTAL_TIME, &total_time); + log_debug("%s: total_time=%f", this->cr_name.c_str(), total_time); + curl_easy_getinfo(this->cr_handle, CURLINFO_SIZE_DOWNLOAD, &download_size); + log_debug("%s: download_size=%f", this->cr_name.c_str(), download_size); + curl_easy_getinfo(this->cr_handle, CURLINFO_SPEED_DOWNLOAD, &download_speed); + log_debug("%s: download_speed=%f", this->cr_name.c_str(), download_speed); + + return -1; + }; + +protected: + + static int debug_cb(CURL *handle, + curl_infotype type, + char *data, + size_t size, + void *userp); + + const std::string cr_name; + bool cr_open; + auto_mem cr_handle; + char cr_error_buffer[CURL_ERROR_SIZE]; + int cr_completions; +}; + +class curl_looper { +public: + curl_looper() + : cl_started(false), + cl_looping(true), + cl_curl_multi(curl_multi_cleanup) { + this->cl_curl_multi.reset(curl_multi_init()); + pthread_mutex_init(&this->cl_mutex, NULL); + pthread_cond_init(&this->cl_cond, NULL); + }; + + ~curl_looper() { + this->stop(); + pthread_cond_destroy(&this->cl_cond); + pthread_mutex_destroy(&this->cl_mutex); + } + + void start() { + if (pthread_create(&this->cl_thread, NULL, trampoline, this) == 0) { + this->cl_started = true; + } + }; + + void stop() { + if (this->cl_started) { + void *result; + + this->cl_looping = false; + { + mutex_guard mg(this->cl_mutex); + + pthread_cond_broadcast(&this->cl_cond); + } + log_debug("waiting for curl_looper thread"); + pthread_join(this->cl_thread, &result); + log_debug("curl_looper thread joined"); + } + }; + + void process_all() { + this->check_for_new_requests(); + + this->requeue_requests(LONG_MAX); + + while (!this->cl_handle_to_request.empty()) { + this->perform_io(); + + this->check_for_finished_requests(); + } + }; + + void add_request(curl_request *cr) { + mutex_guard mg(this->cl_mutex); + + require(cr != NULL); + + this->cl_all_requests.push_back(cr); + this->cl_new_requests.push_back(cr); + pthread_cond_broadcast(&this->cl_cond); + }; + + void close_request(const std::string &name) { + mutex_guard mg(this->cl_mutex); + + this->cl_close_requests.push_back(name); + pthread_cond_broadcast(&this->cl_cond); + }; + +private: + + void *run(); + void loop_body(); + void perform_io(); + void check_for_new_requests(); + void check_for_finished_requests(); + void requeue_requests(mstime_t up_to_time); + + int compute_timeout(mstime_t current_time) const { + int retval = 1000; + + if (!this->cl_poll_queue.empty()) { + retval = std::max( + 1LL, this->cl_poll_queue.front().first - current_time); + } + + ensure(retval > 0); + + return retval; + }; + + static void *trampoline(void *arg); + + bool cl_started; + pthread_t cl_thread; + volatile bool cl_looping; + auto_mem cl_curl_multi; + pthread_mutex_t cl_mutex; + pthread_cond_t cl_cond; + std::vector cl_all_requests; + std::vector cl_new_requests; + std::vector cl_close_requests; + std::map cl_handle_to_request; + std::vector > cl_poll_queue; + +}; +#endif + +#endif diff --git a/src/lnav.cc b/src/lnav.cc index 5ed9c15f..5a5a2731 100644 --- a/src/lnav.cc +++ b/src/lnav.cc @@ -131,6 +131,7 @@ #include "hotkeys.hh" #include "readline_possibilities.hh" #include "field_overlay_source.hh" +#include "url_loader.hh" using namespace std; @@ -1236,7 +1237,10 @@ static void expand_filename(string path, bool required) { static_root_mem gl; - if (glob(path.c_str(), GLOB_NOCHECK, NULL, gl.inout()) == 0) { + if (is_url(path.c_str())) { + return; + } + else if (glob(path.c_str(), GLOB_NOCHECK, NULL, gl.inout()) == 0) { int lpc; if (gl->gl_pathc == 1 /*&& gl.gl_matchc == 0*/) { @@ -2272,6 +2276,7 @@ int main(int argc, char *argv[]) case 'd': lnav_data.ld_debug_log_name = optarg; + lnav_log_level = LOG_LEVEL_TRACE; break; case 'a': @@ -2561,6 +2566,14 @@ int main(int argc, char *argv[]) if (startswith(argv[lpc], "pt:")) { lnav_data.ld_pt_search = argv[lpc]; } +#ifdef HAVE_LIBCURL + else if (is_url(argv[lpc])) { + auto_ptr ul(new url_loader(argv[lpc])); + + lnav_data.ld_file_names.insert(make_pair(argv[lpc], ul->copy_fd().release())); + lnav_data.ld_curl_looper.add_request(ul.release()); + } +#endif else if (is_glob(argv[lpc])) { lnav_data.ld_file_names.insert(make_pair(argv[lpc], -1)); } @@ -2740,6 +2753,7 @@ int main(int argc, char *argv[]) log_info("Executing initial commands"); execute_init_commands(msgs); wait_for_pipers(); + lnav_data.ld_curl_looper.process_all(); rebuild_indexes(false); for (msg_iter = msgs.begin(); @@ -2799,6 +2813,8 @@ int main(int argc, char *argv[]) } } else { + lnav_data.ld_curl_looper.start(); + init_session(); log_info(" session_id=%s", lnav_data.ld_session_id.c_str()); diff --git a/src/lnav.hh b/src/lnav.hh index 16564bc9..8a475f30 100644 --- a/src/lnav.hh +++ b/src/lnav.hh @@ -59,6 +59,7 @@ #include "piper_proc.hh" #include "term_extra.hh" #include "ansi_scrubber.hh" +#include "curl_looper.hh" #include "papertrail_proc.hh" /** The command modes that are available while viewing a file. */ @@ -241,6 +242,8 @@ struct _lnav_data { term_extra ld_term_extra; input_state_tracker ld_input_state; + + curl_looper ld_curl_looper; }; extern struct _lnav_data lnav_data; diff --git a/src/lnav_commands.cc b/src/lnav_commands.cc index c8e7ad91..fbe04a96 100644 --- a/src/lnav_commands.cc +++ b/src/lnav_commands.cc @@ -47,6 +47,7 @@ #include "lnav_commands.hh" #include "session_data.hh" #include "command_executor.hh" +#include "url_loader.hh" using namespace std; @@ -1187,7 +1188,20 @@ static string com_open(string cmdline, vector &args) auto_mem abspath; struct stat st; - if (is_glob(fn.c_str())) { + if (is_url(fn.c_str())) { +#ifndef HAVE_LIBCURL + retval = "error: lnav was not compiled with libcurl"; +#else + auto_ptr ul(new url_loader(fn)); + + lnav_data.ld_file_names.insert(make_pair(fn, ul->copy_fd().release())); + lnav_data.ld_curl_looper.add_request(ul.release()); + lnav_data.ld_files_to_front.push_back(make_pair(fn, top)); + + retval = "info: opened URL"; +#endif + } + else if (is_glob(fn.c_str())) { lnav_data.ld_file_names.insert(make_pair(fn, -1)); retval = "info: watching -- " + fn; } @@ -1273,6 +1287,9 @@ static string com_close(string cmdline, vector &args) } } if (!fn.empty()) { + if (is_url(fn.c_str())) { + lnav_data.ld_curl_looper.close_request(fn); + } lnav_data.ld_file_names.erase(make_pair(fn, -1)); lnav_data.ld_closed_files.insert(fn); retval = "info: closed -- " + fn; @@ -1714,6 +1731,18 @@ static string com_shexec(string cmdline, vector &args) return ""; } +static string com_poll_now(string cmdline, vector &args) +{ + if (args.empty()) { + + } + else { + lnav_data.ld_curl_looper.process_all(); + } + + return ""; +} + static string com_redraw(string cmdline, vector &args) { if (args.empty()) { @@ -1776,5 +1805,6 @@ void init_lnav_commands(readline_context::command_map_t &cmd_map) if (getenv("lnav_test") != NULL) { cmd_map["rebuild"] = com_rebuild; cmd_map["shexec"] = com_shexec; + cmd_map["poll-now"] = com_poll_now; } } diff --git a/src/lnav_log.cc b/src/lnav_log.cc index 55381cab..29242102 100644 --- a/src/lnav_log.cc +++ b/src/lnav_log.cc @@ -42,6 +42,7 @@ #include #include #include +#include #ifdef HAVE_EXECINFO_H #include @@ -68,6 +69,7 @@ #endif #include "lnav_log.hh" +#include "pthreadpp.hh" static const size_t BUFFER_SIZE = 256 * 1024; static const size_t MAX_LOG_LINE_SIZE = 2048; @@ -83,9 +85,10 @@ static const char *CRASH_MSG = "=========================\n"; FILE *lnav_log_file; -lnav_log_level_t lnav_log_level; +lnav_log_level_t lnav_log_level = LOG_LEVEL_DEBUG; const char *lnav_log_crash_dir; const struct termios *lnav_log_orig_termios; +static pthread_mutex_t lnav_log_mutex = PTHREAD_MUTEX_INITIALIZER; log_state_dumper::log_state_list log_state_dumper::DUMPER_LIST; @@ -101,6 +104,7 @@ static struct { }; static const char *LEVEL_NAMES[] = { + "T", "D", "I", "W", @@ -196,6 +200,8 @@ void log_msg(lnav_log_level_t level, const char *src_file, int line_number, return; } + mutex_guard mg(lnav_log_mutex); + va_start(args, fmt); gettimeofday(&curr_time, NULL); localtime_r(&curr_time.tv_sec, &localtm); @@ -233,6 +239,8 @@ void log_msg_extra(const char *fmt, ...) ssize_t rc; char *line; + mutex_guard mg(lnav_log_mutex); + va_start(args, fmt); line = log_alloc(); rc = vsnprintf(line, MAX_LOG_LINE_SIZE - 1, fmt, args); @@ -248,6 +256,8 @@ void log_msg_extra_complete() { char *line; + mutex_guard mg(lnav_log_mutex); + line = log_alloc(); line[0] = '\n'; log_ring.lr_length += 1; diff --git a/src/lnav_log.hh b/src/lnav_log.hh index e51e5a78..487f1383 100644 --- a/src/lnav_log.hh +++ b/src/lnav_log.hh @@ -46,6 +46,7 @@ #endif enum lnav_log_level_t { + LOG_LEVEL_TRACE, LOG_LEVEL_DEBUG, LOG_LEVEL_INFO, LOG_LEVEL_WARNING, @@ -113,6 +114,9 @@ extern enum lnav_log_level_t lnav_log_level; #define log_debug(fmt...) \ log_msg_wrapper(LOG_LEVEL_DEBUG, fmt); +#define log_trace(fmt...) \ + log_msg_wrapper(LOG_LEVEL_TRACE, fmt); + #define require(e) \ ((void) ((e) ? 0 : __require (#e, __FILE__, __LINE__))) #define __require(e, file, line) \ diff --git a/src/lnav_util.cc b/src/lnav_util.cc index 56766b3a..b421a9cc 100644 --- a/src/lnav_util.cc +++ b/src/lnav_util.cc @@ -41,9 +41,20 @@ #include "auto_fd.hh" #include "lnav_util.hh" +#include "pcrepp.hh" using namespace std; +bool is_url(const char *fn) +{ + static pcrepp url_re("^(file|https?|ftps?||scp|sftp):"); + + pcre_context_static<30> pc; + pcre_input pi(fn); + + return url_re.match(pc, pi); +} + std::string hash_string(const std::string &str) { byte_array<2, uint64> hash; diff --git a/src/lnav_util.hh b/src/lnav_util.hh index 23c1bdf8..9d9227e3 100644 --- a/src/lnav_util.hh +++ b/src/lnav_util.hh @@ -36,6 +36,7 @@ #include #include +#include #include #include @@ -107,6 +108,16 @@ inline time_t hour_num(time_t ti) std::string time_ago(time_t last_time); +typedef int64_t mstime_t; + +inline mstime_t getmstime() { + struct timeval tv; + + gettimeofday(&tv, NULL); + + return tv.tv_sec * 1000ULL + tv.tv_usec / 1000ULL; +} + #if SIZEOF_OFF_T == 8 #define FORMAT_OFF_T "%qd" #elif SIZEOF_OFF_T == 4 @@ -173,6 +184,8 @@ inline bool is_glob(const char *fn) strchr(fn, '[') != NULL); }; +bool is_url(const char *fn); + inline bool startswith(const char *str, const char *prefix) { return strncmp(str, prefix, strlen(prefix)) == 0; diff --git a/src/pthreadpp.hh b/src/pthreadpp.hh new file mode 100644 index 00000000..d253fee6 --- /dev/null +++ b/src/pthreadpp.hh @@ -0,0 +1,51 @@ +/** + * Copyright (c) 2015, Timothy Stack + * + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * * Redistributions of source code must retain the above copyright notice, this + * list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above copyright notice, + * this list of conditions and the following disclaimer in the documentation + * and/or other materials provided with the distribution. + * * Neither the name of Timothy Stack nor the names of its contributors + * may be used to endorse or promote products derived from this software + * without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE REGENTS AND CONTRIBUTORS ''AS IS'' AND ANY + * EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED + * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE + * DISCLAIMED. IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE FOR ANY + * DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES + * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; + * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON + * ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS + * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + * @file pthreadpp.hh + */ + +#ifndef pthreadpp_hh +#define pthreadpp_hh + +#include + +class mutex_guard { +public: + mutex_guard(pthread_mutex_t &mutex) : mg_mutex(mutex) { + pthread_mutex_lock(&mutex); + }; + + ~mutex_guard() { + pthread_mutex_unlock(&this->mg_mutex); + }; + +private: + pthread_mutex_t &mg_mutex; +}; + +#endif diff --git a/src/url_loader.hh b/src/url_loader.hh new file mode 100644 index 00000000..da8c148f --- /dev/null +++ b/src/url_loader.hh @@ -0,0 +1,143 @@ +/** + * Copyright (c) 2015, Timothy Stack + * + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * * Redistributions of source code must retain the above copyright notice, this + * list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above copyright notice, + * this list of conditions and the following disclaimer in the documentation + * and/or other materials provided with the distribution. + * * Neither the name of Timothy Stack nor the names of its contributors + * may be used to endorse or promote products derived from this software + * without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE REGENTS AND CONTRIBUTORS ''AS IS'' AND ANY + * EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED + * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE + * DISCLAIMED. IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE FOR ANY + * DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES + * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; + * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON + * ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS + * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +#ifndef url_loader_hh +#define url_loader_hh + +#ifdef HAVE_LIBCURL +#include + +class url_loader : public curl_request { +public: + url_loader(const std::string &url) : curl_request(url), ul_resume_offset(0) { + char piper_tmpname[PATH_MAX]; + const char *tmpdir; + + if ((tmpdir = getenv("TMPDIR")) == NULL) { + tmpdir = _PATH_VARTMP; + } + snprintf(piper_tmpname, sizeof(piper_tmpname), + "%s/lnav.url.XXXXXX", + tmpdir); + if ((this->ul_fd = mkstemp(piper_tmpname)) == -1) { + return; + } + + unlink(piper_tmpname); + + curl_easy_setopt(this->cr_handle, CURLOPT_URL, this->cr_name.c_str()); + curl_easy_setopt(this->cr_handle, CURLOPT_WRITEFUNCTION, write_cb); + curl_easy_setopt(this->cr_handle, CURLOPT_WRITEDATA, this); + curl_easy_setopt(this->cr_handle, CURLOPT_FILETIME, 1); + }; + + int get_fd() const { + return this->ul_fd.get(); + }; + + auto_fd copy_fd() const { + return this->ul_fd; + }; + + long complete(CURLcode result) { + curl_request::complete(result); + + switch (result) { + case CURLE_OK: + break; + case CURLE_BAD_DOWNLOAD_RESUME: + break; + default: + log_error("%s:curl failure -- %ld %s", + this->cr_name.c_str(), result, curl_easy_strerror(result)); + write(this->ul_fd, this->cr_error_buffer, strlen(this->cr_error_buffer)); + return -1; + } + + long file_time; + CURLcode rc; + + rc = curl_easy_getinfo(this->cr_handle, CURLINFO_FILETIME, &file_time); + if (rc == CURLE_OK) { + time_t current_time; + + time(¤t_time); + if (file_time == -1 || + (current_time - file_time) < FOLLOW_IF_MODIFIED_SINCE) { + char range[64]; + struct stat st; + off_t start; + + fstat(this->ul_fd, &st); + if (st.st_size > 0) { + start = st.st_size - 1; + this->ul_resume_offset = 1; + } + else { + start = 0; + this->ul_resume_offset = 0; + } + snprintf(range, sizeof(range), "%ld-", (long) start); + curl_easy_setopt(this->cr_handle, CURLOPT_RANGE, range); + return 2000; + } + else { + log_debug("URL was not recently modified, not tailing: %s", + this->cr_name.c_str()); + } + } + else { + log_error("Could not get file time for URL: %s -- %s", + this->cr_name.c_str(), curl_easy_strerror(rc)); + } + + return -1; + }; + +private: + static const long FOLLOW_IF_MODIFIED_SINCE = 60 * 60; + + static ssize_t write_cb(void *contents, size_t size, size_t nmemb, void *userp) { + url_loader *ul = (url_loader *) userp; + char *c_contents = (char *) contents; + ssize_t retval; + + c_contents += ul->ul_resume_offset; + retval = write(ul->ul_fd, c_contents, (size * nmemb) - ul->ul_resume_offset); + retval += ul->ul_resume_offset; + ul->ul_resume_offset = 0; + return retval; + } + + auto_fd ul_fd; + off_t ul_resume_offset; +}; +#endif + +#endif diff --git a/test/Makefile.am b/test/Makefile.am index d4892e97..155bd581 100644 --- a/test/Makefile.am +++ b/test/Makefile.am @@ -158,6 +158,7 @@ drive_sql_LDADD = \ $(PCRE_LIBS) \ $(CURSES_LIB) \ $(READLINE_LIBS) \ + $(LIBCURL) \ -lpcrecpp slicer_SOURCES = slicer.cc @@ -169,6 +170,7 @@ scripty_LDADD = ../src/libdiag.a dist_noinst_SCRIPTS = \ parser_debugger.py \ test_cmds.sh \ + test_curl.sh \ test_data_parser.sh \ test_format_loader.sh \ test_grep_proc.sh \ @@ -296,6 +298,11 @@ TESTS = \ test_yajlpp \ test_pretty_print.sh +if HAVE_LIBCURL +TESTS += \ + test_curl.sh +endif + DISTCLEANFILES = \ *.dat \ *.err \ diff --git a/test/test_curl.sh b/test/test_curl.sh new file mode 100644 index 00000000..be3d1cd8 --- /dev/null +++ b/test/test_curl.sh @@ -0,0 +1,48 @@ +#! /bin/bash + +if test x"$SFTP_TEST_URL" == x""; then + exit 0 +fi + +run_test ${lnav_test} -n \ + file://${test_dir}/logfile_access_log.0 + +check_output "file URL is not working" <> curl_access_log.0" \ + $SFTP_TEST_URL/`pwd`/curl_access_log.0 + +check_output "sftp URL is not working" <