first commit

src
Kevin Lynx 11 years ago
commit d23b3293f1

3
.gitignore vendored

@ -0,0 +1,3 @@
deps
*.swp
bin

@ -0,0 +1,28 @@
{'src/*',
[debug_info,
{i, "include"},
{outdir,"ebin"}]}.
{'src/bt/*',
[debug_info,
{i, "include"},
{outdir,"ebin"}]}.
{'src/common/*',
[debug_info,
{i, "include"},
{outdir,"ebin"}]}.
{'src/http_front/*',
[debug_info,
{i, "include"},
{outdir,"ebin"}]}.
{'src/hash_reader/*',
[debug_info,
{i, "include"},
{outdir,"ebin"}]}.
{'src/crawler/*',
[debug_info,
{i, "include"},
{outdir,"ebin"}]}.
{'src/torrent/*',
[debug_info,
{i, "include"},
{outdir,"ebin"}]}.

@ -0,0 +1,37 @@
## dhtcrawler
dhtcrawler is a DHT crawler written in erlang. It can join a DHT network and crawl many P2P torrents. The program save all torrent info into database and provide an http interface to search a torrent by a keyword.
![screenshot](https://raw.github.com/kevinlynx/dhtcrawler/master/screenshot.png)
## Usage
* Download mongodb and start it with text search, i.e:
mongod --dbpath db --setParameter textSearchEnabled=true
* Download dhtcrawler source code
* Use `rebar` to download and install all dependent libraries
rebar get-deps
* compile
rebar compile
* start dhtcrawler
crawler_app:start()
* start the http front-end
crawler_http:start().
* Open a web browser and point to `localhost:8000/index.html`
## Config
see priv/dhtcrawler.config.
**NOTE**, when you change `node_count` value in dhtcrawler.config, you'd better delete all files saved in dhtstate directory.

@ -0,0 +1,4 @@
-define(HASH_DBNAME, dht_hash).
-define(HASH_COLLNAME, hash).
-define(HASH_DOWNLOAD_COLL, wait_download).

@ -0,0 +1,32 @@
%%
%% vlog.hrl
%% Kevin Lynx
%% 06.05.2013
%%
-ifndef(VLOGHRL).
-define(VLOGHRL, true).
-define(TRACE, 0).
-define(INFO, 1).
-define(WARN, 2).
-define(ERROR, 3).
-define(OFF, 4).
-define(LVLS(L),
case L of
?TRACE -> "trac";
?INFO -> "info";
?WARN -> "warn";
?ERROR -> "error"
end).
-define(LOG(X, Lvl),
vlog:format(Lvl, "~s [~s] {~p, ~p}: ~p~n",
[?LVLS(Lvl), vlog:time_string(), ?MODULE, ?LINE, X])).
-define(T(X), ?LOG(X, ?TRACE)).
-define(I(X), ?LOG(X, ?INFO)).
-define(W(X), ?LOG(X, ?WARN)).
-define(E(X), ?LOG(X, ?ERROR)).
-define(FMT(S, A), lists:flatten(io_lib:format(S, A))).
-endif.

@ -0,0 +1,6 @@
[{start_port,6776},
{node_count,50},
{loglevel,3},
{dbconn, 5},
{dbhost,"localhost"},
{dbport,27017}].

BIN
rebar vendored

Binary file not shown.

@ -0,0 +1,4 @@
@echo off
setlocal
set rebarscript=%~f0
escript.exe "%rebarscript:.cmd=%" %*

@ -0,0 +1,9 @@
{erl_opts, [debug_info, fail_on_warning]}.
{deps, [
{ibrowse, ".*", {git, "git@github.com:cmullaparthi/ibrowse.git", "HEAD"}},
{bson, ".*", {git, "git@github.com:mongodb/bson-erlang.git", "HEAD"}},
{mongodb, ".*", {git, "git@github.com:mongodb/mongodb-erlang.git", "HEAD"}},
{kdht, ".*", {git, "git@github.com:kevinlynx/kdht.git", "HEAD"}}
]}.

@ -0,0 +1,151 @@
-module(bt_conn).
-behaviour(gen_server).
-export([init/1,
handle_call/3,
handle_cast/2,
handle_info/2,
terminate/2,
code_change/3]).
-export([start/4,
stop/1]).
-export([test/0]).
-record(state, {data, sock, hash, req = 0,
msgid = 0, metasize = 0, metainfo = <<>>,
support_utdata = false}).
-define(UT_METADATA_MSGID, 1).
start(IP, Port, Hash, Self) ->
gen_server:start_link(?MODULE, [IP, Port, Hash, Self], []).
stop(Pid) ->
gen_server:cast(Pid, stop).
init([IP, Port, Hash, Self]) ->
{ok, {IP, Port, Hash, Self}, 0}.
terminate(_, State) when is_tuple(State) ->
{ok, State};
terminate(_, State) ->
#state{sock = Sock} = State,
gen_tcp:close(Sock),
{ok, State}.
code_change(_, _, State) ->
{ok, State}.
handle_cast(stop, State) ->
{stop, normal, State}.
handle_info({tcp, Sock, Data}, State) ->
#state{data = Old, req = Req} = State,
FullData = <<Old/binary, Data/binary>>,
{NewReq, NewData, Msgs} = parse_message(Req, FullData, []),
inet:setopts(Sock, [{active,once}]),
NewState = process_message(State#state{data = NewData, req = NewReq}, Msgs),
{noreply, NewState};
handle_info(timeout, {IP, Port, Hash, Self}) ->
Sock = case gen_tcp:connect(IP, Port, [binary, {active, once}]) of
{ok, S} -> S;
_ -> nil
end,
State = #state{data = <<>>, sock = Sock, hash = Hash, req = 1},
Handshake = bt_message:encode_handshake(Hash, Self),
gen_tcp:send(Sock, Handshake),
ExtHandshake = bt_message:encode_ext_handshake(?UT_METADATA_MSGID),
gen_tcp:send(Sock, ExtHandshake),
{noreply, State};
handle_info(_, State) ->
{noreply, State}.
handle_call(_, _From, State) ->
{noreply, State}.
parse_message(1, Bin, Msgs) ->
{_, _, _, Rest} = bt_message:decode_handshake(Bin),
parse_message(2, Rest, Msgs);
parse_message(Req, <<>>, Msgs) ->
{Req, <<>>, lists:reverse(Msgs)};
parse_message(Req, Bin, Msgs) ->
case bt_message:decode(Bin) of
{Rest, not_completed} ->
{Req, Rest, Msgs};
{Rest, Msg} ->
parse_message(Req + 1, Rest, [Msg|Msgs])
end.
process_message(State, []) ->
State;
process_message(State, [Msg|Rest]) ->
NewState = process_message(State, Msg),
process_message(NewState, Rest);
process_message(State, not_implemented) ->
State;
process_message(State, {extend, handshake, Dict, _}) ->
{ok, {dict, M}} = dict:find(<<"m">>, Dict),
{ok, ID} = dict:find(<<"ut_metadata">>, M),
{ok, Size} = dict:find(<<"metadata_size">>, Dict),
% request the metainfo
start_req_metainfo(State, ID),
State#state{msgid = ID, metasize = Size};
process_message(State, {extend, MsgID, Dict, Body}) ->
?UT_METADATA_MSGID = MsgID,
{ok, FP} = file:open("meta.torrent", [append]),
file:write(FP, Body),
file:close(FP),
MetaMsg = bt_message:decode_metadata_msg(Dict, Body),
process_meta_message(State, MetaMsg).
% request, reject right now
process_meta_message(State, {request, _}) ->
#state{sock = Sock, msgid = MsgID} = State,
Msg = bt_message:encode_metadata_reject(MsgID),
gen_tcp:send(Sock, Msg),
State;
% data
process_meta_message(State, {data, Piece, Size, Data}) ->
#state{metainfo = OldInfo, metasize = MetaSize, sock = Sock, msgid = MsgID} = State,
Size = MetaSize,
NewInfo = <<OldInfo/binary, Data/binary>>,
case byte_size(NewInfo) >= MetaSize of
true ->
% load metainfo done, TODO: notify
io:format("download metainfo done ~p ~n", [Size]),
file:write_file("meta.torrent", NewInfo),
ok;
false ->
% request next piece
ReqMsg = bt_message:encode_metadata_req(MsgID, Piece + 1),
gen_tcp:send(Sock, ReqMsg),
ok
end,
State#state{metainfo = NewInfo};
% reject, ignore right now
process_meta_message(State, {reject, _}) ->
State.
start_req_metainfo(State, ID) ->
#state{sock = Sock} = State,
ReqMsg = bt_message:encode_metadata_req(ID, 0),
gen_tcp:send(Sock, ReqMsg).
%%%
test() ->
I = bep9:random(),
Peer = <<I:160>>,
%Hash = <<200,126,2,108,203,24,198,144,173,99,133,8,141,160,119,166,176,58,126,169>>,
Hash = <<77,16,191,99,137,133,31,179,255,232,239,14,116,98,74,114,233,232,39,248>>,
%Hash = <<252,152,147,123,241,68,124,54,123,130,135,101,215,57,9, 59,102,111,53,209>>,
start({127, 0, 0, 1}, 6881, Hash, Peer).

@ -0,0 +1,122 @@
%%
%% bt_message.erl
%% Kevin Lynx
%% 06.24.2013
%%
-module(bt_message).
-export([decode/1,
decode_handshake/1,
encode_handshake/2,
decode_metadata_msg/2,
encode_ext_handshake/1,
encode_metadata_req/2,
encode_metadata_reject/1,
decode_metadata_id/1]).
-compile(export_all).
-define(MSG_EXTEND, 20).
-define(MSG_EXT_HANDSHAKE, 0).
-define(PROTO_NAME, <<"BitTorrent protocol">>).
encode_handshake(Hash, PeerID) when is_binary(Hash), is_binary(PeerID) ->
% to support extend message, ext[5] |= 0x10
Ext = <<0, 0, 0, 0, 0, 16, 0, 0>>,
<<19:8, ?PROTO_NAME/binary, Ext/binary, Hash/binary, PeerID/binary>>.
decode_handshake(Bin) when is_binary(Bin) ->
<<PLen:8, R/binary>> = Bin,
<<PName:PLen/binary, Ext:8/binary, Hash:20/binary, PeerID:20/binary, Rest/binary>> = R,
case PName == ?PROTO_NAME of
true ->
{Ext, Hash, PeerID, Rest};
false ->
{error, Bin}
end.
% {R, not_completed}
% {R, {extend, handkshake, Dict}}
% {R, {extend, ExtMsgID, Dict}}
% {R, not_implemented}
decode(Bin) when is_binary(Bin) ->
<<Size:32, Rest/binary>> = Bin,
case byte_size(Rest) < Size of
true ->
{Bin, not_completed};
false ->
decode(Size, Rest)
end.
decode(0, <<>>) -> % keep-alive
{<<>>, not_implemented};
decode(Size, <<Type:8, Rest/binary>>) ->
% (whole size) - (message type)
BodySize = Size - 1,
case Type of
?MSG_EXTEND ->
{Code, Dict, BodyR, R} = decode_extend(BodySize, Rest),
{R, {extend, Code, Dict, BodyR}};
0 ->
{Rest, not_implemented};
_ ->
<<_:BodySize/binary, R/binary>> = Rest,
{R, not_implemented}
end.
decode_extend(Size, Bin) ->
% exclude the extend message id
DictSize = Size - 1,
<<ID:8, Body:DictSize/binary, Rest/binary>> = Bin,
case ID of
?MSG_EXT_HANDSHAKE ->
{{dict, D}, R} = bencode:dec(Body),
{handshake, D, R, Rest};
_ ->
{{dict, D}, R} = bencode:dec(Body),
% may has extra data, e.g, `data' message in ut_metadata
{ID, D, R, Rest}
end.
decode_metadata_id(Dict) ->
{ok, {dict, M}} = dict:find(<<"m">>, Dict),
case dict:find(<<"ut_metadata">>, M) of
{ok, ID} ->
ID;
_ ->
not_support
end.
% should check id match first
decode_metadata_msg(Dict, Bin) ->
{ok, Type} = dict:find(<<"msg_type">>, Dict),
{ok, Piece} = dict:find(<<"piece">>, Dict),
case Type of
0 -> % request
Bin = <<>>,
{request, Piece};
1 -> % data
% total size the total size for the torrent metadata, not the size of this piece
{ok, Size} = dict:find(<<"total_size">>, Dict),
{data, Piece, Size, Bin};
2 -> % reject
Bin = <<>>,
{reject, Piece}
end.
encode_ext_msg(MsgID, Dict) ->
Body = bencode:encode(Dict),
Len = byte_size(Body) + 2,
<<Len:32, 20:8, MsgID:8, Body/binary>>.
encode_ext_handshake(MetaMsgID) ->
M = {dict, dict:from_list([{<<"ut_metadata">>, MetaMsgID}])},
Dict = {dict, dict:from_list([{<<"m">>, M}])},
encode_ext_msg(0, Dict).
encode_metadata_req(MsgID, Piece) ->
Dict = {dict, dict:from_list([{<<"msg_type">>, 0}, {<<"piece">>, Piece}])},
encode_ext_msg(MsgID, Dict).
encode_metadata_reject(MsgID) ->
Dict = {dict, dict:from_list([{<<"msg_type">>, 2}, {<<"piece">>, 0}])},
encode_ext_msg(MsgID, Dict).

@ -0,0 +1,40 @@
%%
%% string_split.erl
%% Kevin Lynx
%% 06.17.2013
%% split a string into substrings
%%
-module(string_split).
-export([split/1]).
-compile(export_all).
split(Str) when is_list(Str) ->
B = list_to_binary(Str),
case unicode:characters_to_list(B) of
{error, L, D} ->
{error, L, D};
{incomplete, L, D} ->
{incomplete, L, D};
UL ->
{ok, subsplit(UL)}
end.
subsplit([]) ->
[];
subsplit(L) ->
[_|R] = L,
{PreL, _} = lists:splitwith(fun(Ch) -> not is_spliter(Ch) end, L),
[unicode:characters_to_binary(lists:sublist(PreL, Len))
|| Len <- lists:seq(1, length(PreL))] ++ subsplit(R).
% TODO: more control characters
is_spliter(Ch) ->
(Ch < $0) or
((Ch >= $[) and (Ch =< $`)) or
((Ch >= ${) and (Ch =< $~)) or
((Ch >= $:) and (Ch =< $@)).

@ -0,0 +1,36 @@
%%
%% time_util.erl
%% Kevin Lynx
%% 06.18.2013
%%
-module(time_util).
-export([now_seconds/0,
diff_milsecs/2,
seconds_to_local_time/1,
now_day_seconds/0]).
-compile(export_all).
diff_milsecs(T1, T2) ->
timer:now_diff(T1, T2) div 1000.
now_seconds() ->
{Megasecs, Secs, Microsecs} = now(),
(Megasecs * 1000000) + Secs + (Microsecs div 1000000).
seconds_to_local_time(Secs) ->
{{Y, M, D}, Time} = calendar:gregorian_seconds_to_datetime(Secs),
calendar:universal_time_to_local_time({{Y + 1970, M, D}, Time}).
now_day_seconds() ->
{{Year, Month, Day}, _Time} = calendar:local_time(),
{{NY, NM, ND}, Time} = local_time_to_universal_time({{Year, Month, Day}, {0, 0, 0}}),
calendar:datetime_to_gregorian_seconds({{NY - 1970, NM, ND}, Time}).
local_time_to_universal_time(Datetime) ->
case calendar:local_time_to_universal_time_dst(Datetime) of
[_, DateTimeUTC] ->
DateTimeUTC;
[DateTimeUTC] ->
DateTimeUTC
end.

@ -0,0 +1,90 @@
%%
%% vlog.erl
%% Kevin Lynx
%% 06.05.2013
%%
-module(vlog).
-behaviour(gen_server).
-export([init/1,
handle_call/3,
handle_cast/2,
handle_info/2,
terminate/2,
code_change/3]).
-export([start_link/2,
format/3,
set_level/1,
sync_format/3,
time_string/0,
stop/0]).
-record(state, {name, level}).
-include("vlog.hrl").
start_link(Name, Lvl) ->
gen_server:start_link({local, srv_name()}, ?MODULE, [Name, Lvl], []).
stop() ->
gen_server:cast(srv_name(), stop).
set_level(Level) ->
gen_server:call(srv_name(), {set_level, Level}).
format(Lvl, Fmt, Arg) ->
gen_server:cast(srv_name(), {write, Lvl, Fmt, Arg}).
sync_format(Lvl, Fmt, Arg) ->
gen_server:call(srv_name(), {write, Lvl, Fmt, Arg}).
time_string() ->
{{_Year, _Month, _Day}, {Hour, Min, Sec}} = erlang:localtime(),
lists:flatten(io_lib:format("~2.10.0b:~2.10.0b:~2.10.0b", [Hour, Min, Sec])).
srv_name() ->
?MODULE.
init([Name, Lvl]) ->
{ok, FP} = file:open(Name, [write]),
file:close(FP),
{ok, #state{name = Name, level = Lvl}}.
handle_info(_, State) ->
{noreply, State}.
handle_cast({write, Level, Fmt, Arg}, State) ->
do_log(State, Level, Fmt, Arg),
{noreply, State};
handle_cast(stop, State) ->
{stop, normal, State};
handle_cast(_, State) ->
{noreply, State}.
terminate(_, State) ->
{ok, State}.
code_change(_, _, State) ->
{ok, State}.
handle_call({write, Level, Fmt, Arg}, _From, State) ->
do_log(State, Level, Fmt, Arg),
{reply, ok, State};
handle_call({set_level, Level}, _From, State) ->
{reply, ok, State#state{level = Level}};
handle_call(_, _From, State) ->
{reply, not_implemented, State}.
do_log(State, Level, Fmt, Arg) ->
#state{name = Name, level = MaxLevel} = State,
case Level >= MaxLevel of
true -> append(Name, Fmt, Arg);
false -> false
end.
append(Name, Fmt, Arg) ->
{ok, FP} = file:open(Name, [append]),
io:format(FP, Fmt, Arg),
file:close(FP).

@ -0,0 +1,64 @@
%%
%% crawler_app.erl
%% Kevin Lynx
%% 06.19.2013
%%
-module(crawler_app).
-behaviour(application).
-export([start/2, stop/1]).
-export([start/0, startboth/0, stop/0]).
start(_Type, _StartArgs) ->
File = config_file_name(),
io:format("load config file ~s ", [File]),
case file:consult(File) of
{error, _Reason} ->
do_default_start(File);
{ok, [Cfg]} ->
do_start(Cfg)
end.
stop(_State) ->
crawler_sup:stop().
config_file_name() ->
filename:join([filename:dirname(code:which(?MODULE)),
"..", "priv", "dhtcrawler.config"]).
do_default_start(File) ->
List = [{start_port, 6776},
{node_count, 10},
{loglevel, 3},
{dbconn, 15},
{dbhost, "localhost"},
{dbport, 27017}],
filelib:ensure_dir("priv/"),
file:write_file(File, io_lib:fwrite("~p.\n",[List])),
do_start(List).
do_start(List) ->
StartPort = proplists:get_value(start_port, List),
Count = proplists:get_value(node_count, List),
LogLevel = proplists:get_value(loglevel, List),
DBConn = proplists:get_value(dbconn, List),
DBHost = proplists:get_value(dbhost, List),
DBPort = proplists:get_value(dbport, List),
io:format("dhtcrawler startup ~p, ~p, ~p:~p~n", [StartPort, Count, DBHost, DBPort]),
crawler_sup:start_link({StartPort, Count, DBHost, DBPort, LogLevel, DBConn}).
start() ->
error_logger:logfile({open, "crash.log"}),
code:add_path("deps/bson/ebin"),
code:add_path("deps/kdht/ebin"),
code:add_path("deps/mongodb/ebin"),
Apps = [asn1, crypto, public_key, ssl, inets, bson, mongodb],
[application:start(App) || App <- Apps],
application:start(dhtcrawler).
stop() ->
application:stop(dhtcrawler).
startboth() ->
start(),
crawler_http:start().

@ -0,0 +1,161 @@
%%
%% crawler_stats.erl
%% Kevin Lynx
%% 06.17.2013
%%
-module(crawler_stats).
-behaviour(gen_server).
-export([init/1,
handle_call/3,
handle_cast/2,
handle_info/2,
terminate/2,
code_change/3]).
-export([start_link/0,
stop/0,
saved/1,
announce/0,
get_peers/0,
dump/0,
get_stats_desc/0,
get_stats/0]).
-export([handle_new/0,
handle_update/0,
handle_init/1]).
-include("crawler_stats.hrl").
-define(DUMP_INTERVAL, 30*60*1000).
start_link() ->
gen_server:start_link({local, srv_name()}, ?MODULE, [], []).
stop() ->
gen_server:cast(srv_name(), stop).
announce() ->
gen_server:cast(srv_name(), {announce}).
get_peers() ->
gen_server:cast(srv_name(), {get_peers}).
saved(New) ->
Type = case New of
true -> new_torrent;
false -> update_torrent
end,
gen_server:cast(srv_name(), {Type}).
get_stats() ->
gen_server:call(srv_name(), {get_stats}).
get_stats_desc() ->
gen_server:call(srv_name(), {get_stats_desc}).
% TODO: re-arrange the process relationship
handle_init(Sum) ->
start_link(),
gen_server:call(srv_name(), {set_sum, Sum}).
handle_new() ->
saved(true).
handle_update() ->
saved(false).
dump() ->
gen_server:cast(srv_name(), {dump}).
srv_name() ->
?MODULE.
init([]) ->
timer:send_interval(?DUMP_INTERVAL, {dump_stats}), % cancel the timer ?
{ok, #crawler_stats{start_time = now(), torrent_sum = 0}}.
terminate(_, State) ->
{ok, State}.
code_change(_, _, State) ->
{ok, State}.
handle_cast({new_torrent}, State) ->
#crawler_stats{new_saved = Saved, torrent_sum = Sum, torrent_count = ThisCount} = State,
{noreply, State#crawler_stats{new_saved = Saved + 1,
torrent_count = ThisCount + 1,
torrent_sum = Sum + 1}};
handle_cast({update_torrent}, State) ->
#crawler_stats{updated = U, torrent_count = ThisCount} = State,
{noreply, State#crawler_stats{updated = U + 1, torrent_count = ThisCount + 1}};
handle_cast({announce}, State) ->
#crawler_stats{announce_count = A} = State,
{noreply, State#crawler_stats{announce_count = A + 1}};
handle_cast({get_peers}, State) ->
#crawler_stats{get_peers_count = G} = State,
{noreply, State#crawler_stats{get_peers_count = G + 1}};
handle_cast({dump}, State) ->
do_dump(State),
{noreply, State};
handle_cast(stop, State) ->
{stop, normal, State}.
handle_info({dump_stats}, State) ->
do_dump(State),
{noreply, State};
handle_info(_, State) ->
{noreply, State}.
handle_call({set_sum, Sum}, _From, State) ->
{reply, ok, State#crawler_stats{torrent_sum = Sum}};
handle_call({get_stats_desc}, _From, State) ->
{reply, format_stats(State), State};
handle_call({get_stats}, _From, State) ->
Elapsed = stats_time(State#crawler_stats.start_time),
Ret = {Elapsed, State},
{reply, Ret, State};
handle_call(_, _From, State) ->
{noreply, State}.
stats_time(Start) ->
DiffSecs = timer:now_diff(now(), Start) div 1000 div 1000,
% {day, {hour, min, sec}}
calendar:seconds_to_daystime(DiffSecs).
date_string() ->
{{Year, Month, Day}, {Hour, Min, Sec}} = erlang:localtime(),
lists:flatten(io_lib:format("~b-~2.10.0b-~2.10.0b ~2.10.0b:~2.10.0b:~2.10.0b",
[Year, Month, Day, Hour, Min, Sec])).
-define(TEXT(Fmt, Arg), lists:flatten(io_lib:format(Fmt, Arg))).
do_dump(State) ->
{ok, FP} = file:open("dhtcrawler-stats.txt", [append]),
io:format(FP, "~s~n", [date_string()]),
io:format(FP, "~s~n", [format_stats(State)]),
file:close(FP).
format_stats(State) ->
#crawler_stats{
get_peers_count = G,
announce_count = A,
torrent_count = ThisCount,
new_saved = New,
updated = U,
torrent_sum = Sum
} = State,
{Day, {H, M, S}} = stats_time(State#crawler_stats.start_time),
?TEXT(" stats time ~b ~2.10.0b:~2.10.0b:~2.10.0b~n",
[Day, H, M, S]) ++
?TEXT(" torrent sum ~b~n", [Sum]) ++
?TEXT(" get_peers count ~b~n", [G]) ++
?TEXT(" announce count ~b~n", [A]) ++
?TEXT(" download torrent ~b~n", [ThisCount]) ++
?TEXT(" new saved ~b~n", [New]) ++
?TEXT(" updated ~b~n", [U]).

@ -0,0 +1,10 @@
-record(crawler_stats, {
start_time = now(),
announce_count = 0,
get_peers_count = 0,
torrent_count = 0, % valid torrent which has been downloaded even it's duplicated
new_saved = 0, % saved as a new torrent into database
updated = 0, % exist in database and update it
torrent_sum % the sum torrent count
}).

@ -0,0 +1,75 @@
%%
%% crawler_sup.erl
%% Kevin Lynx
%% 06.14.2013
%%
-module(crawler_sup).
-behaviour(supervisor).
-include("vlog.hrl").
-export([init/1]).
-export([start_link/1,
stop/0,
save_all_state/0]).
start_link(Args) ->
supervisor:start_link({local, srv_name()}, ?MODULE, [Args]).
stop() ->
save_all_state(),
exit(whereis(srv_name()), normal).
srv_name() ->
dht_crawler_sup.
init([{StartPort, Count, DBHost, DBPort, LogLevel, DBConn}]) ->
Spec = {one_for_one, 1, 600},
Instances = create_dht_instance(StartPort, Count),
Logger = [{dht_logger, {vlog, start_link, ["dht_crawler.txt", LogLevel]},
permanent, 2000, worker, dynamic}],
%Downloader = [{torrent_downloader, {torrent_download, start_link, []},
% permanent, 2000, worker, dynamic}],
%DBStorer = [{torrent_index, {torrent_index, start_link, [DBHost, DBPort, crawler_stats, DBConn]},
% permanent, 2000, worker, dynamic}],
HashInserter = [{db_hash, {db_hash, start_link, [DBHost, DBPort, DBConn]},
permanent, 2000, worker, dynamic}],
Stats = [{crawler_stats, {crawler_stats, start_link, []},
permanent, 2000, worker, dynamic}],
%Children = Logger ++ Downloader ++ DBStorer ++ Instances,
Children = Logger ++ HashInserter ++ Stats ++ Instances,
{ok, {Spec, Children}}.
create_dht_instance(StartPort, Count) ->
Dir = "dhtstate/",
filelib:ensure_dir(Dir),
IDs = create_discrete_ids(Count),
Generator = lists:zip(IDs, lists:seq(StartPort, StartPort + Count - 1)),
[{instance_name(Port),
{kdht_sup, start_link, [instance_state_file(Dir, Port), Port, dht_monitor, ID]},
permanent, 1000, supervisor, [kdht]}
|| {ID, Port} <- Generator].
instance_name(Port) ->
Name = lists:flatten(io_lib:format("dht_instance_~p", [Port])),
list_to_atom(Name).
instance_state_file(Dir, Port) ->
lists:flatten(io_lib:format("~sstate~b", [Dir, Port])).
create_discrete_ids(1) ->
[dht_id:random()];
create_discrete_ids(Count) ->
Max = dht_id:max(),
Piece = Max div Count,
[random:uniform(Piece) + Index * Piece || Index <- lists:seq(0, Count - 1)].
save_all_state() ->
[save_instance_state(Instance) || Instance <- supervisor:which_children(srv_name())].
save_instance_state({_ID, Pid, supervisor, [kdht]}) ->
kdht_sup:save_state(Pid);
save_instance_state({_ID, _Pid, worker, _}) ->
ok.

@ -0,0 +1,117 @@
%%
%% dht_monitor.erl
%% Kevin Lynx
%% 06.14.2013
%% TODO: better to use gen_server
%%
-module(dht_monitor).
-include("vlog.hrl").
-export([handle_event/2,
handle_torrent/3,
process_infohash_event/1,
save_to_db/3,
tell_more_nodes/1]).
-export([debug_dump/4,
debug_dump_failed/1]).
-define(QUERY_INTERVAL, 1*60*1000).
% depends on the test log, `get_peers' > `announce_peer'
handle_event(announce_peer, {_InfoHash, _IP, _BTPort}) ->
crawler_stats:announce();
handle_event(get_peers, {InfoHash, _IP, _Port}) ->
crawler_stats:get_peers(),
%spawn(?MODULE, process_infohash_event, [InfoHash]);
MagHash = dht_id:tohex(InfoHash),
db_hash:insert(MagHash);
handle_event(startup, {MyID}) ->
spawn(?MODULE, tell_more_nodes, [MyID]).
% since some operation will wait infinity, so spawn a new process
% NOTE: this may cause many processes, depends on database operation speed.
process_infohash_event(InfoHash) ->
MagHash = dht_id:tohex(InfoHash),
Wait = 60*1000,
try
case torrent_index:inc_announce(MagHash, Wait) of
true ->
crawler_stats:saved(false);
false ->
download(InfoHash)
end
catch
exit:{timeout, _} ->
?E(?FMT("inc_announce timeout exception for ~s", [MagHash]))
end.
tell_more_nodes(MyID) ->
search:get_peers(MyID, dht_id:random()),
timer:sleep(?QUERY_INTERVAL),
tell_more_nodes(MyID). % tail recursive, be careful
download(InfoHash) ->
MagHash = dht_id:tohex(InfoHash),
torrent_download:download(MagHash, ?MODULE).
handle_torrent(ok, MagHash, TContent) ->
case catch(torrent_file:parse(TContent)) of
{'EXIT', _} ->
?E(?FMT("parse torrent file failed ~p", [TContent]));
{Type, Info} ->
save_to_db(MagHash, Type, Info)
end,
ok;
handle_torrent(error, _MagHash, _TContent) ->
ok.
save_to_db(MagHash, single, {Name, Length}) ->
torrent_index:insert(MagHash, Name, Length);
save_to_db(MagHash, multi, {Root, Files}) ->
torrent_index:insert(MagHash, Root, 0, Files).
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
debug_dump_failed(MagHash) ->
io:format("download ~s failed~n", [MagHash]),
{ok, FP} = file:open("magnets.txt", [append]),
io:format(FP, "~s~n", [format_magnet(MagHash)]),
io:format(FP, " download torrent failed [~s]~n", [get_time_string()]),
file:close(FP).
get_time_string() ->
{{Year, Month, Day}, {Hour, Min, Sec}} = erlang:localtime(),
Str = io_lib:format("~b-~2.10.0b-~2.10.0b ~2.10.0b:~2.10.0b:~2.10.0b",
[Year, Month, Day, Hour, Min, Sec]),
lists:flatten(Str).
format_magnet(MagHash) ->
"magnet:?xt=urn:btih:" ++ MagHash.
output_torrent_info(single, {Name, Length}, FP) ->
io:format(FP, " ~s ~s~n", [Name, torrent_file:size_string(Length)]);
output_torrent_info(multi, {Root, Files}, FP) ->
io:format(FP, " ~s~n", [Root]),
[io:format(FP, " ~s ~s~n", [Path, torrent_file:size_string(Length)])
|| {Path, Length} <- Files].
debug_dump(MagHash, TContent, Type, Info) ->
Filename = save_torrent(MagHash, TContent),
io:format("download ~s success (~p byts), save as ~s~n",
[MagHash, byte_size(TContent), Filename]),
TSize = byte_size(TContent),
{ok, FP} = file:open("magnets.txt", [append]),
io:format(FP, "~s~n", [format_magnet(MagHash)]),
io:format(FP, " download torrent success [~s] ~s (~s)~n",
[get_time_string(), Filename, torrent_file:size_string(TSize)]),
output_torrent_info(Type, Info, FP),
file:close(FP).
save_torrent(MagHash, TContent) ->
Dir = "download/",
filelib:ensure_dir(Dir),
Filename = Dir ++ MagHash ++ ".torrent",
file:write_file(Filename, TContent),
Filename.

@ -0,0 +1,84 @@
%%
%% db_hash.erl
%% Kevin Lynx
%% 06.28.2013
%% save info_hash to database
%%
-module(db_hash).
-behaviour(gen_server).
-export([init/1,
handle_call/3,
handle_cast/2,
handle_info/2,
terminate/2,
code_change/3]).
-export([start_link/2,
start_link/3,
insert/1,
stop/0]).
-record(state, {hash_docs, index}).
-define(MAX_HASH, 1000).
-define(POOL_NAME, hash_db_pool).
-include("db_common.hrl").
start_link(IP, Port, Size) ->
gen_server:start_link({local, srv_name()}, ?MODULE, [IP, Port, Size], []).
start_link(IP, Port) ->
start_link(IP, Port, 5).
stop() ->
gen_server:cast(srv_name(), stop).
% magnet hash, binary string
insert(Hash) when is_list(Hash) ->
gen_server:cast(srv_name(), {insert, list_to_binary(Hash)}).
srv_name() ->
db_hash.
init([IP, Port, Size]) ->
mongo_sup:start_pool(?POOL_NAME, Size, {IP, Port}),
process_flag(trap_exit, true),
{_, Index} = db_system:load_batch_index(mongo_pool:get(?POOL_NAME)),
{ok, #state{hash_docs = [], index = Index}}.
terminate(_, State) ->
mongo_sup:stop_pool(?POOL_NAME),
{ok, State}.
code_change(_, _, State) ->
{ok, State}.
handle_cast({insert, Hash}, State) ->
#state{hash_docs = List, index = Index} = State,
{NewIndex, NewList} = try_insert(Index, Hash, List),
{noreply, State#state{hash_docs = NewList, index = NewIndex}};
handle_cast(stop, State) ->
{stop, normal, State}.
handle_call(_, _From, State) ->
{noreply, State}.
handle_info(_, State) ->
{noreply, State}.
%% DB stuff
try_insert(Index, Hash, List) when length(List) + 1 == ?MAX_HASH ->
Conn = mongo_pool:get(?POOL_NAME),
mongo:do(unsafe, master, Conn, ?HASH_DBNAME, fun() ->
mongo:insert(?HASH_COLLNAME, [create_hash_doc(Index, Hash)|List])
end),
db_system:inc_batch_windex(Conn),
db_system:stats_query_inserted(Conn, ?MAX_HASH),
{Index + 1, []};
try_insert(Index, Hash, List) ->
Doc = create_hash_doc(Index, Hash),
{Index, [Doc|List]}.
create_hash_doc(Index, Hash) when is_binary(Hash) ->
{hash, Hash, index, Index, req_at, time_util:now_seconds()}.

@ -0,0 +1,271 @@
%%
%% db_store_mongo.erl
%% Kevin Lynx
%% 06.16.2013
%%
-module(db_store_mongo).
-include("vlog.hrl").
-export([init/2,
close/1,
insert/5,
unsafe_insert/2,
count/1,
inc_announce/2,
exist/2,
index/2,
search_announce_top/2,
search_recently/2,
search/2]).
-compile(export_all).
-define(DBNAME, torrents).
-define(COLLNAME, hashes).
-define(SEARCH_COL, name_array).
init(Host, Port) ->
{ok, Conn} = mongo_connection:start_link({Host, Port}),
?I(?FMT("connect mongodb ~p:~p success", [Host, Port])),
enable_text_search(Conn),
ensure_search_index(Conn),
Conn.
close(Conn) ->
mongo_connection:stop(Conn).
count(Conn) ->
mongo_do(Conn, fun() ->
mongo:count(?COLLNAME, {})
end).
exist(Conn, Hash) when is_list(Hash) ->
case find_exist(Conn, Hash) of
{} -> false;
_ -> true
end.
% {Rets, {Found, CostTime}}
search(Conn, Key) when is_list(Key) ->
BinColl = list_to_binary(atom_to_list(?COLLNAME)),
BinKey = list_to_binary(Key),
Ret = mongo_do(Conn, fun() ->
mongo:command({text, BinColl, search, BinKey})
end),
{decode_search(Ret), decode_search_stats(Ret)}.
search_announce_top(Conn, Count) ->
Sel = {'$query', {}, '$orderby', {announce, -1}},
List = mongo_do(Conn, fun() ->
% mongodb-erlang does not provide cursor.limit()/sort() functions, wired
% but it work here
Cursor = mongo:find(?COLLNAME, Sel, [], 0, Count),
mongo_cursor:rest(Cursor)
end),
[decode_torrent_item(Item) || Item <- List].
% db.hashes.find({$query:{},$orderby:{created_at: 1}}).limit(10);
search_recently(Conn, Count) ->
Sel = {'$query', {}, '$orderby', {created_at, -1}},
List = mongo_do(Conn, fun() ->
Cursor = mongo:find(?COLLNAME, Sel, [], 0, Count),
mongo_cursor:rest(Cursor)
end),
[decode_torrent_item(Item) || Item <- List].
index(Conn, Hash) when is_list(Hash) ->
Ret = mongo_do(Conn, fun() ->
mongo:find_one(?COLLNAME, {'_id', list_to_binary(Hash)})
end),
case Ret of
{} -> {};
{Torrent} -> decode_torrent_item(Torrent)
end.
insert(Conn, Hash, Name, Length, Files) when is_list(Hash) ->
NewDoc = create_torrent_desc(Hash, Name, Length, 1, Files),
mongo_do(Conn, fun() ->
%mongo:insert(?COLLNAME, NewDoc)
% since the doc may already exist (inc_announce failed), i update the doc here
Sel = {'_id', list_to_binary(Hash)},
mongo:update(?COLLNAME, Sel, NewDoc, true)
end).
unsafe_insert(Conn, Tors) when is_list(Tors) ->
Docs = [create_torrent_desc(Hash, Name, Length, 1, Files) ||
{Hash, Name, Length, Files} <- Tors],
mongo:do(unsafe, master, Conn, ?DBNAME, fun() ->
mongo:insert(?COLLNAME, Docs)
end).
inc_announce(Conn, Hash) when is_list(Hash) ->
inc_announce(Conn, list_to_binary(Hash));
inc_announce(Conn, Hash) when is_binary(Hash) ->
% damn, mongodb-erlang doesnot support update a field for an object,
% `findAndModify` works but it will change `announce' datatype to double
Cmd = {findAndModify, ?COLLNAME, query, {'_id', Hash},
update, {'$inc', {announce, 1}},
new, true},
Ret = mongo_do(Conn, fun() ->
mongo:command(Cmd)
end),
case Ret of
{value, undefined, ok, 1.0} -> false;
{value, _Obj, lastErrorObject, {updatedExisting, true, n, 1}, ok, 1.0} -> true;
_ -> false
end.
ensure_search_index(Conn) ->
Spec = {key, {?SEARCH_COL, <<"text">>}},
mongo_do(Conn, fun() ->
mongo:ensure_index(?COLLNAME, Spec)
end).
% not work
enable_text_search(Conn) ->
Cmd = {setParameter, 1, textSearchEnabled, true},
mongo:do(safe, master, Conn, admin, fun() ->
mongo:command(Cmd)
end).
create_torrent_desc(Hash, Name, Length, Announce, Files) ->
NameArray = case string_split:split(Name) of
{error, L, D} ->
?E(?FMT("string split failed(error): ~p ~p", [L, D])),
[Name];
{incomplete, L, D} ->
?E(?FMT("string split failed(incomplte): ~p ~p", [L, D])),
[Name];
{ok, R} -> R
end,
{'_id', list_to_binary(Hash),
name, list_to_binary(Name),
name_array, NameArray,
length, Length,
created_at, time_util:now_seconds(),
announce, Announce,
files, encode_file_list(Files)}.
% {file1, {name, xx, length, xx}, file2, {name, xx, length, xx}}
encode_file_list(Files) ->
Keys = ["file"++integer_to_list(Index) || Index <- lists:seq(1, length(Files))],
Generator = lists:zip(Keys, Files),
list_to_tuple(lists:flatten([[list_to_atom(Key), {name, list_to_binary(Name), length, Length}]
|| {Key, {Name, Length}} <- Generator])).
find_exist(Conn, Hash) ->
mongo_do(Conn, fun() ->
mongo:find_one(?COLLNAME, hash_selector(Hash))
end).
mongo_do(Conn, Fun) ->
mongo:do(safe, master, Conn, ?DBNAME, Fun).
% TODO: replace this with {'_id', ID}
hash_selector(Hash) ->
Expr = lists:flatten(io_lib:format("this._id == '~s'", [Hash])),
{'$where', list_to_binary(Expr)}.
decode_search_stats(Rets) ->
{Stats} = bson:lookup(stats, Rets),
{Found} = bson:lookup(nfound, Stats),
{Cost} = bson:lookup(timeMicros, Stats),
{Found, Cost}.
decode_search(Rets) ->
case bson:lookup(results, Rets) of
{} ->
[];
{List} ->
[decode_ret_item(Item) || Item <- List]
end.
decode_ret_item(Item) ->
{Torrent} = bson:lookup(obj, Item),
decode_torrent_item(Torrent).
decode_torrent_item(Torrent) ->
{BinHash} = bson:lookup('_id', Torrent),
Hash = binary_to_list(BinHash),
{BinName} = bson:lookup(name, Torrent),
Name = binary_to_list(BinName),
{Length} = bson:lookup(length, Torrent),
{CreatedT} = bson:lookup(created_at, Torrent),
ICreatedAt = round(CreatedT),
{Announce} = bson:lookup(announce, Torrent),
IA = round(Announce), % since announce maybe double in mongodb
case bson:lookup(files, Torrent) of
{{}} ->
{single, Hash, {Name, Length}, IA, ICreatedAt};
{Files} ->
{multi, Hash, {Name, decode_files(tuple_to_list(Files))}, IA, ICreatedAt}
end.
decode_files(Files) ->
decode_file(Files).
decode_file([_|[File|Rest]]) ->
{BinName} = bson:lookup(name, File),
Name = binary_to_list(BinName),
{Length} = bson:lookup(length, File),
[{Name, Length}] ++ decode_file(Rest);
decode_file([]) ->
[].
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
test_insert() ->
Conn = init(localhost, 27017),
insert(Conn, "7C6932E7EC1CF5B00AE991871E57B2375DADA5A0", "movie 1", 128, []),
insert(Conn, "AE94E340B5234C8410F37CFA7170F8C5657ECE5D", "another movie name", 0,
[{"subfile-a", 100}, {"subfile-b", 80}]),
insert(Conn, "0F1B5BE407E130AEEA8AB2964F5100190086ED93", "oh it work", 2456, []),
close(Conn).
test_content(Fun) ->
Conn = init(localhost, 27017),
Ret = Fun(Conn),
close(Conn),
Ret.
test_ensureidx() ->
test_content(fun(Conn) ->
enable_text_search(Conn),
ensure_search_index(Conn)
end).
test_search(Key) ->
test_content(fun(Conn) ->
search(Conn, Key)
end).
test_tmpsearch(Key) ->
test_content(fun(Conn) ->
BinColl = list_to_binary(atom_to_list(?COLLNAME)),
BinKey = list_to_binary(Key),
Ret = mongo_do(Conn, fun() ->
mongo:command({text, BinColl, search, BinKey})
end),
Ret
end).
test_count() ->
test_content(fun(Conn) ->
count(Conn)
end).
test_find_top() ->
test_content(fun(Conn) ->
search_announce_top(Conn, 2)
end).
test_announce() ->
Hash = "F79ED3E2BF29A5C4358202E88C9983AB479D7722",
test_content(fun(Conn) ->
inc_announce(Conn, Hash)
end).
test_index(Hash) ->
test_content(fun(Conn) ->
index(Conn, Hash)
end).

@ -0,0 +1,100 @@
%%
%% db_system.erl
%% Kevin Lynx
%% 06.28.2013
%%
-module(db_system).
-export([load_batch_index/1,
inc_batch_rindex/1,
inc_batch_windex/1]).
-export([stats_new_saved/1,
stats_updated/1,
stats_query_inserted/2,
stats_day_at/2,
stats_get_peers/1]).
-define(DBNAME, dht_system).
-define(COLLNAME, system).
-define(HASH_BATCH_KEY, <<"hashbatch">>).
-define(STATS_COLLNAME, stats).
%% batch index
inc_batch_rindex(Conn) ->
inc_batch_index(Conn, read_index).
inc_batch_windex(Conn) ->
inc_batch_index(Conn, write_index).
inc_batch_index(Conn, Col) ->
Cmd = {findAndModify, ?COLLNAME, query, {'_id', ?HASH_BATCH_KEY},
update, {'$inc', {Col, 1}}, new, true},
mongo:do(safe, master, Conn, ?DBNAME, fun() ->
mongo:command(Cmd)
end).
load_batch_index(Conn) ->
Doc = case find_exist_batch_index(Conn) of
{} ->
NewDoc = create_batch_index(0, 0),
mongo:do(safe, master, Conn, ?DBNAME, fun() ->
mongo:insert(?COLLNAME, NewDoc)
end),
NewDoc;
{Exist} ->
Exist
end,
{RIndex} = bson:lookup(read_index, Doc),
{WIndex} = bson:lookup(write_index, Doc),
{RIndex, WIndex}.
find_exist_batch_index(Conn) ->
mongo:do(safe, master, Conn, ?DBNAME, fun() ->
mongo:find_one(?COLLNAME, {'_id', ?HASH_BATCH_KEY})
end).
create_batch_index(WIndex, RIndex) ->
{'_id', ?HASH_BATCH_KEY, read_index, WIndex, write_index, RIndex}.
%% stats collection
stats_new_saved(Conn) ->
stats_inc_field(Conn, new_saved).
stats_updated(Conn) ->
stats_inc_field(Conn, updated).
% already processes query
stats_get_peers(Conn) ->
stats_inc_field(Conn, get_peers).
% all queries, not processed
stats_query_inserted(Conn, Count) ->
stats_inc_field(Conn, get_peers_query, Count).
stats_inc_field(Conn, Filed) ->
stats_inc_field(Conn, Filed, 1).
stats_inc_field(Conn, Filed, Inc) ->
TodaySecs = time_util:now_day_seconds(),
mongo:do(unsafe, master, Conn, ?DBNAME, fun() ->
Doc = stats_ensure_today(TodaySecs),
{Val} = bson:lookup(Filed, Doc),
NewDoc = bson:update(Filed, Val + Inc, Doc),
mongo:update(?STATS_COLLNAME, {'_id', TodaySecs}, NewDoc)
end).
stats_day_at(Conn, DaySec) ->
mongo:do(safe, master, Conn, ?DBNAME, fun() ->
stats_ensure_today(DaySec)
end).
stats_ensure_today(TodaySecs) ->
case mongo:find_one(?STATS_COLLNAME, {'_id', TodaySecs}) of
{} ->
NewDoc = {'_id', TodaySecs, get_peers, 0, get_peers_query, 0,
updated, 0, new_saved, 0},
mongo:insert(?STATS_COLLNAME, NewDoc),
NewDoc;
{Doc} ->
Doc
end.

@ -0,0 +1,9 @@
{application, dhtcrawler, [
{description, "A DHT crawler to index magnet hash to torrent"},
{vsn, git},
{registered, [dht_crawler_sup]},
{applications, [kernel, stdlib, crypto,
public_key, ssl, inets, bson, mongodb]},
{mod, {crawler_app, []}}
]}.

@ -0,0 +1,182 @@
%%
%% db_hash_reader.erl
%% Kevin Lynx
%% 06.28.2013
%%
-module(db_hash_reader).
-compile(export_all).
-include("vlog.hrl").
-behaviour(gen_server).
-export([init/1,
handle_call/3,
handle_cast/2,
handle_info/2,
terminate/2,
code_change/3]).
-export([start_link/1,
stop/1]).
-record(state, {downloader, dbpool, downloading = 0}).
-include("db_common.hrl").
% if there's no hash, wait some time
-define(WAIT_TIME, 2*60*1000).
% the max concurrent download tasks
-define(MAX_DOWNLOAD, 50).
start_link(DBPool) ->
gen_server:start_link(?MODULE, [DBPool], []).
stop(Pid) ->
gen_server:cast(Pid, stop).
%
init([DBPool]) ->
{ok, DownPid} = tor_download:start_link(),
tor_download_stats:register(DownPid),
{ok, #state{dbpool = DBPool, downloader = DownPid}, 0}.
terminate(_, State) ->
{ok, State}.
code_change(_, _, State) ->
{ok, State}.
handle_info({got_torrent, failed, _Hash}, State) ->
#state{downloading = D} = State,
Conn = db_conn(State),
try_next_download(Conn),
{noreply, State#state{downloading = D - 1}};
handle_info({got_torrent, ok, Hash, Content}, State) ->
#state{downloading = D} = State,
Conn = db_conn(State),
try_next_download(Conn),
case catch(torrent_file:parse(Content)) of
{'EXIT', _} ->
State;
{Type, Info} ->
got_torrent(State, Hash, Type, Info)
end,
{noreply, State#state{downloading = D - 1}};
handle_info(timeout, State) ->
Conn = db_conn(State),
try_next(Conn),
{noreply, State};
handle_info(_, State) ->
{noreply, State}.
handle_cast({process_hash, Doc, DownloadDoc}, State) ->
#state{downloader = DownPid} = State,
Conn = db_conn(State),
{Hash} = bson:lookup(hash, Doc),
ListHash = binary_to_list(Hash),
% to avoid register many timers when the hash is empty but download hash is not
% it also can avoid increase the message queue size, everytime this function get called,
% it remove this message and append only another 1.
if DownloadDoc -> do_nothing; true -> try_next(Conn) end,
NewState = case db_store_mongo:inc_announce(Conn, ListHash) of
true ->
?T(?FMT("inc_announce success ~s", [ListHash])),
on_updated(Conn),
State;
false ->
?T(?FMT("start to download the torrent ~s", [ListHash])),
try_download(State, DownPid, ListHash, Doc)
end,
{noreply, NewState};
handle_cast(stop, State) ->
{stop, normal, State}.
handle_call(_, _From, State) ->
{noreply, State}.
try_download(State, Pid, Hash, Doc) ->
#state{downloading = D} = State,
Conn = db_conn(State),
NewDownloading = case D >= ?MAX_DOWNLOAD of
true -> % put it into the download queue
insert_to_download_wait(Conn, Doc),
D;
false -> % download it now
tor_download:download(Pid, Hash),
D + 1
end,
State#state{downloading = NewDownloading}.
try_save(State, Hash, Name, Length, Files) ->
Conn = db_conn(State),
case catch db_store_mongo:insert(Conn, Hash, Name, Length, Files) of
{'EXIT', Reason} ->
?E(?FMT("save torrent failed ~p", [Reason]));
_ ->
on_saved(Conn)
end.
on_saved(Conn) ->
% `get_peers' here means we have processed a request
db_system:stats_get_peers(Conn),
% increase the `new' counter
db_system:stats_new_saved(Conn),
hash_reader_stats:handle_insert().
on_updated(Conn) ->
% `get_peers' here means we have processed a request
db_system:stats_get_peers(Conn),
% also increase the updated counter
db_system:stats_updated(Conn),
hash_reader_stats:handle_update().
got_torrent(State, Hash, single, {Name, Length}) ->
try_save(State, Hash, Name, Length, []);
got_torrent(State, Hash, multi, {Root, Files}) ->
try_save(State, Hash, Root, 0, Files).
% insert the doc to the `wait-download' collection, and when the
% downloader is free, it will download this doc.
insert_to_download_wait(Conn, Doc) ->
{ID} = bson:lookup('_id', Doc),
Sel = {'_id', ID},
mongo:do(safe, master, Conn, ?HASH_DBNAME, fun() ->
% may exist already
mongo:update(?HASH_DOWNLOAD_COLL, Sel, Doc, true)
end).
try_next_download(Conn) ->
Doc = mongo:do(safe, master, Conn, ?HASH_DBNAME, fun() ->
D = mongo:find_one(?HASH_DOWNLOAD_COLL, {}),
delete_inner_doc(?HASH_DOWNLOAD_COLL, D),
D
end),
schedule_next(Doc, true).
try_next(Conn) ->
Doc = mongo:do(safe, master, Conn, ?HASH_DBNAME, fun() ->
D = mongo:find_one(?HASH_COLLNAME, {}),
delete_inner_doc(?HASH_COLLNAME, D),
D
end),
schedule_next(Doc, false).
delete_inner_doc(_Col, {}) ->
ok;
delete_inner_doc(Col, {Doc}) ->
{ID} = bson:lookup('_id', Doc),
mongo:delete(Col, {'_id', ID}).
schedule_next({}, true) ->
ok;
schedule_next({}, false) ->
timer:send_after(?WAIT_TIME, timeout);
schedule_next({Doc}, DownloadDoc) ->
gen_server:cast(self(), {process_hash, Doc, DownloadDoc}).
db_conn(State) ->
#state{dbpool = DBPool} = State,
mongo_pool:get(DBPool).

@ -0,0 +1,63 @@
%%
%% db_hash_reader_sup.erl
%% Kevin Lynx
%% 06.29.2013
%%
-module(db_hash_reader_sup).
-behaviour(supervisor).
-export([init/1]).
-export([start_link/3,
start_dep_apps/0,
start_standalone/3,
start_standalone/1]).
start_dep_apps() ->
code:add_path("deps/bson/ebin"),
code:add_path("deps/mongodb/ebin"),
code:add_path("deps/kdht/ebin"),
code:add_path("deps/ibrowse/ebin"),
Apps = [asn1, crypto, public_key, ssl, inets, bson, mongodb],
[application:start(App) || App <- Apps].
start_standalone([IP, Port, Size]) ->
IPort = list_to_integer(Port),
ISize = list_to_integer(Size),
start_standalone(IP, IPort, ISize),
receive
fuck_erl_s_option -> ok
end.
start_standalone(IP, Port, Size) ->
io:format("db: ~p:~p reader count ~p~n", [IP, Port, Size]),
filelib:ensure_dir("log/"),
start_dep_apps(),
tor_download:start_global(),
% NOTE:
Stats = {db_hash_reader_stats, {hash_reader_stats, start_link, [Size]}, permanent, 2000, worker, [hash_reader_stats]},
DownloadStats = {tor_download_stats, {tor_download_stats, start_link, []}, permanent, 2000, worker, [tor_download_stats]},
Log = {vlog, {vlog, start_link, ["log/hash_reader.log", 3]}, permanent, 2000, worker, [vlog]},
start_link(IP, Port, Size, [DownloadStats, Stats, Log]).
start_link(IP, Port, Size) ->
start_link(IP, Port, Size, []).
start_link(IP, Port, Size, OtherProcess) ->
PoolName = mongodb_conn_pool_name,
mongo_sup:start_pool(PoolName, 5, {IP, Port}),
supervisor:start_link({local, srv_name()}, ?MODULE, [PoolName, Size, OtherProcess]).
srv_name() ->
?MODULE.
init([PoolName, Size, OtherProcess]) ->
Spec = {one_for_one, 1, 600},
Children = OtherProcess ++ [create_child(PoolName, Index) || Index <- lists:seq(1, Size)],
{ok, {Spec, Children}}.
create_child(PoolName, Index) ->
{child_id(Index), {db_hash_reader, start_link, [PoolName]},
permanent, 1000, worker, dynamic}.
child_id(Index) ->
list_to_atom(lists:flatten(io_lib:format("db_hash_reader_~p", [Index]))).

@ -0,0 +1,136 @@
%%
%% hash_reader_stats.erl
%% Kevin Lynx
%% 06.29.2013
%%
-module(hash_reader_stats).
-behaviour(gen_server).
-export([init/1,
handle_call/3,
handle_cast/2,
handle_info/2,
terminate/2,
code_change/3]).
-export([start_link/2,
start_link/1,
stop/0,
handle_update/0,
handle_insert/0,
dump/0]).
-record(state, {tref, count, start, name, updated = 0, inserted = 0}).
-define(STATS_INTERVAL, 10*60*1000).
-define(TEXT(Fmt, Arg), lists:flatten(io_lib:format(Fmt, Arg))).
start_link(Count) ->
start_link(Count, "reader_stats.txt").
start_link(Count, Name) ->
gen_server:start_link({local, srv_name()}, ?MODULE, [Count, Name], []).
stop() ->
gen_server:cast(srv_name(), stop).
dump() ->
gen_server:cast(srv_name(), dump).
handle_update() ->
gen_server:cast(srv_name(), inc_updated).
handle_insert() ->
gen_server:cast(srv_name(), inc_inserted).
srv_name() ->
?MODULE.
init([Count, Name]) ->
{ok, TRef} = timer:send_interval(?STATS_INTERVAL, dump),
State = #state{tref = TRef, count = Count, name = Name, start = now()},
{ok, State}.
terminate(_, State) ->
#state{tref = TRef} = State,
timer:cancel(TRef),
{ok, State}.
code_change(_, _, State) ->
{ok, State}.
handle_info(dump, State) ->
do_dump(State),
{noreply, State};
handle_info(_, State) ->
{noreply, State}.
handle_cast(dump, State) ->
do_dump(State),
{noreply, State};
handle_cast(inc_updated, State) ->
#state{updated = U} = State,
{noreply, State#state{updated = U + 1}};
handle_cast(inc_inserted, State) ->
#state{inserted = I} = State,
{noreply, State#state{inserted = I + 1}};
handle_cast(stop, State) ->
{stop, normal, State}.
handle_call(_, _From, State) ->
{noreply, State}.
do_dump(State) ->
Dir = "log/",
#state{name = Name} = State,
filelib:ensure_dir(Dir),
% this will cause a long time waiting
DownloadStats = format_download_stats(),
{ok, FP} = file:open(Dir ++ Name, [append]),
io:format(FP, "~s~n", [date_string()]),
io:format(FP, "~p~n", [self()]),
io:format(FP, "~s~n", [format_stats(State)]),
io:format(FP, "~s~n", [DownloadStats]),
file:close(FP).
stats_time(Start) ->
DiffSecs = timer:now_diff(now(), Start) div 1000 div 1000,
% {day, {hour, min, sec}}
calendar:seconds_to_daystime(DiffSecs).
date_string() ->
{{Year, Month, Day}, {Hour, Min, Sec}} = erlang:localtime(),
lists:flatten(io_lib:format("~b-~2.10.0b-~2.10.0b ~2.10.0b:~2.10.0b:~2.10.0b",
[Year, Month, Day, Hour, Min, Sec])).
format_stats(State) ->
#state{count = C, start = Start, updated = U, inserted = I} = State,
{Day, {H, M, S}} = stats_time(Start),
Mins = Day * 24 * 60 + H * 60 + M,
TotalMins = if Mins > 0 -> Mins; true -> 1 end,
Speed = (U + I) div TotalMins,
?TEXT(" stats time ~b ~2.10.0b:~2.10.0b:~2.10.0b~n",
[Day, H, M, S]) ++
?TEXT(" Reader count ~p~n", [C]) ++
?TEXT(" Process speed ~p req/min~n", [Speed]) ++
?TEXT(" Download torrents speed ~p tor/min~n", [I div TotalMins]) ++
?TEXT(" Updated ~p~n", [U]) ++
?TEXT(" Inserted ~p~n", [I]).
format_download_stats() ->
Start = now(),
{ProcessCount, HashSum, ReqSum, TotalTime, CurrentReqCount} =
tor_download_stats:stats(),
TotalSecs = TotalTime div 1000,
Used = timer:now_diff(now(), Start) div 1000,
?TEXT(" ==== Torrent download stats ====~n", []) ++
?TEXT(" Stats used time ~p ms~n", [Used]) ++
?TEXT(" Downloader count ~p~n", [ProcessCount]) ++
?TEXT(" Request torrents ~p~n", [HashSum]) ++
?TEXT(" Http requests ~p~n", [ReqSum]) ++
?TEXT(" Total used time ~p secs~n", [TotalSecs]) ++
?TEXT(" Download speed ~p tor/secs~n", [TotalSecs div HashSum]) ++
?TEXT(" Current wait requests ~p~n", [CurrentReqCount]).

@ -0,0 +1,81 @@
%%
%% crawler_http.erl
%% Kevin Lynx
%% 06.15.2013
%%
-module(crawler_http).
-behaviour(gen_server).
-export([init/1,
handle_info/2,
handle_cast/2,
handle_call/3,
code_change/3,
terminate/2]).
-export([start/0,
start/3,
page_temp/0,
stop/0]).
-record(state, {html_temp, httpid}).
start(DBHost, DBPort, Port) ->
code:add_path("deps/bson/ebin"),
code:add_path("deps/mongodb/ebin"),
Apps = [crypto, public_key, ssl, inets, bson, mongodb],
[application:start(App) || App <- Apps],
gen_server:start({local, srv_name()}, ?MODULE, [DBHost, DBPort, Port], []).
start() ->
start(localhost, 27017, 8000).
stop() ->
gen_server:cast(srv_name(), stop).
page_temp() ->
gen_server:call(srv_name(), get_page_temp).
srv_name() ->
crawler_http.
init([DBHost, DBPort, Port]) ->
torrent_index:start_link(DBHost, DBPort),
{ok, Pid} = inets:start(httpd, [
{modules, [mod_alias, mod_auth, mod_esi, mod_actions,
mod_cgi, mod_dir, mod_get, mod_head, mod_log, mod_disk_log]},
{port, Port},
{bind_address, {0, 0, 0, 0}},
{server_name, "crawler_http"},
{document_root, "www"},
{server_root, "."},
{directory_index, ["index.html"]},
{erl_script_alias, {"/e", [http_handler]}},
{mime_types, [{"html","text/html"},
{"css","text/css"}, {"js","application/x-javascript"}]}]),
{ok, B} = file:read_file("www/page.temp"),
Html = binary_to_list(B),
{ok, #state{html_temp = Html, httpid = Pid}}.
handle_call(get_page_temp, _From, State) ->
#state{html_temp = Html} = State,
{reply, Html, State};
handle_call(_, _From, State) ->
{noreply, State}.
handle_cast(stop, State) ->
{stop, normal, State};
handle_cast(_, State) ->
{noreply, State}.
terminate(_, State) ->
#state{httpid = Pid} = State,
torrent_index:stop(),
inets:stop(httpd, Pid),
{ok, State}.
code_change(_, _, State) ->
{ok, State}.
handle_info(_, State) ->
{noreply, State}.

@ -0,0 +1,142 @@
%%
%% http_handler.erl
%% Kevin Lynx
%% 06.18.2013
%%
-module(http_handler).
-export([search/3,
test_search/1,
index/3,
stats/3,
recent/3,
top/3]).
-define(TEXT(Fmt, Args), lists:flatten(io_lib:format(Fmt, Args))).
-import(torrent_file, [size_string/1]).
-define(CONTENT_TYPE, "Content-Type: text/html\r\n\r\n").
search(SessionID, _Env, Input) ->
{K, Body} = case get_search_keyword(Input) of
[] ->
{"", "invalid input"};
Key ->
{Key, do_search(Key)}
end,
Response = simple_html(K, Body),
mod_esi:deliver(SessionID, [?CONTENT_TYPE, Response]).
top(SessionID, _Env, _Input) ->
Rets = torrent_index:top(),
BodyList = format_search_result(Rets),
Body = ?TEXT("<ol>~s</ol>", [lists:flatten(BodyList)]),
Response = simple_html("top", Body),
mod_esi:deliver(SessionID, [?CONTENT_TYPE, Response]).
recent(SessionID, _Env, _Input) ->
Rets = torrent_index:recent(),
BodyList = format_search_result(Rets),
Body = ?TEXT("<ol>~s</ol>", [lists:flatten(BodyList)]),
Response = simple_html("recent", Body),
mod_esi:deliver(SessionID, [?CONTENT_TYPE, Response]).
stats(SessionID, _Env, _Input) ->
Body = ?TEXT("total ~p torrents", [torrent_index:count()]),
Response = simple_html("", Body),
mod_esi:deliver(SessionID, [?CONTENT_TYPE, Response]).
index(SessionID, _Env, Input) ->
Body = case get_index_hash(Input) of
[] ->
"invalid hash";
Hash ->
format_view(Hash)
end,
Response = simple_html("", Body),
mod_esi:deliver(SessionID, [?CONTENT_TYPE, Response]).
get_search_keyword(Input) ->
case string:equal(string:substr(Input, 1, 2), "q=") of
true ->
urldecode:decode(string:substr(Input, 3));
false ->
[]
end.
get_index_hash(Input) ->
case string:equal(string:substr(Input, 1, 2), "q=") of
true ->
string:substr(Input, 3);
false ->
[]
end.
simple_html(Key, Body) ->
?TEXT(crawler_http:page_temp(), [Key, Body]).
test_search(Keyword) ->
Filename = ?TEXT("search_~s.html", [Keyword]),
Body = do_search(Keyword),
file:write_file(Filename, simple_html(Keyword, Body)).
do_search(Keyword) ->
{Rets, Stats} = torrent_index:search(Keyword),
{_Found, Cost} = Stats,
Tip = ?TEXT("<h4>search ~s, ~b results, ~f seconds</h4>",
[Keyword, length(Rets), Cost / 1000 / 1000]),
BodyList = format_search_result(Rets),
Body = ?TEXT("<ol>~s</ol>", [lists:flatten(BodyList)]),
Tip ++ Body.
format_search_result(RetList) ->
[format_one_result(Result, false) || Result <- RetList].
format_one_result({single, Hash, {Name, Length}, Announce, CTime}, ShowAll) ->
format_one_result(Hash, Name, [{Name, Length}], Announce, CTime, ShowAll);
format_one_result({multi, Hash, {Name, Files}, Announce, CTime}, ShowAll) ->
format_one_result(Hash, Name, Files, Announce, CTime, ShowAll).
format_one_result(Hash, Name, Files, Announce, CTime, ShowAll) ->
?TEXT("<li><p style=\"font-size: 120%;\">
<a target='_blank' href=\"/e/http_handler:index?q=~s\">~s</a></p><ul>~s</ul>",
[Hash, Name, format_files(Files, ShowAll)]) ++
?TEXT("<p style=\"font-size:80%\">Index at: ~s | File count: ~p | Announce count: ~p
<a href=\"~s\" style=\"font-size:120%\"> Download</a></p>",
[format_time_string(CTime), length(Files), Announce, format_magnet(Hash)]).
format_files(Files, false) ->
Sub = case length(Files) > 3 of
true ->
lists:sublist(Files, 3) ++ [{more, length(Files) - 3}];
false ->
Files
end,
lists:flatten([format_file(File) || File <- Sub]);
format_files(Files, true) ->
lists:flatten([format_file(File) || File <- Files]).
format_file({more, Len}) ->
?TEXT("<li>...~b more files</li>", [Len]);
format_file({Name, Length}) ->
?TEXT("<li>~s <span style=\"color:#888;\">~s</span></li>",
[Name, size_string(Length)]).
format_view(Hash) ->
case torrent_index:index(Hash) of
{} -> "not found";
Torrent ->
format_torrent_detail(Torrent)
end.
format_torrent_detail(Torrent) ->
format_one_result(Torrent, true).
format_magnet(MagHash) ->
"magnet:?xt=urn:btih:" ++ MagHash.
format_time_string(Secs) ->
{{Y, M, D}, {H, Min, Sec}} = time_util:seconds_to_local_time(Secs),
?TEXT("~b-~2..0b-~2..0b ~2..0b:~2..0b:~2..0b",
[Y, M, D, H, Min, Sec]).

@ -0,0 +1,29 @@
%%
%% urldecode.erl
%%
-module(urldecode).
-export([decode/1]).
-compile(export_all).
decode([$%, Hi, Lo | Tail]) ->
Hex = unhexdigit(Hi, Lo),
[Hex | decode(Tail)];
decode([$?|T]) ->
[$?|T];
decode([H|T]) when is_integer(H) ->
[H |decode(T)];
decode([]) ->
[];
decode([H|T]) when is_list(H) ->
[decode(H) | decode(T)].
unhexdigit(C) when C >= $0, C =< $9 -> C - $0;
unhexdigit(C) when C >= $a, C =< $f -> C - $a + 10;
unhexdigit(C) when C >= $A, C =< $F -> C - $A + 10.
unhexdigit(HiCh, LoCh) ->
unhexdigit(LoCh) bor (unhexdigit(HiCh) bsl 4).
test(S) ->
R = decode(S),
file:write_file("decode.txt", R).

@ -0,0 +1,186 @@
%%
%% tor_download.erl
%% Kevin Lynx
%% 06.30.2013
%%
-module(tor_download).
-include("vlog.hrl").
-behaviour(gen_server).
-export([init/1,
handle_info/2,
handle_cast/2,
handle_call/3,
code_change/3,
terminate/2]).
-export([start_global/0,
start_link/0,
stop/1,
download/2,
stats/1]).
-export([test/2]).
-define(HTTP_SESSION, 5000).
-define(HTTP_PIPELINE, 1000).
-define(REQ_TIMEOUT, 60*1000).
-record(state, {start, hashSum = 0, reqSum = 0, totalTime = 0, reqs}).
start_global() ->
ibrowse:start(),
Options = [{max_sessions, ?HTTP_SESSION}, {max_pipeline_size, ?HTTP_PIPELINE}],
% not work here ?
[ibrowse:set_dest(Host, 80, Options) || Host <- get_req_hosts()],
ok.
start_link() ->
gen_server:start_link(?MODULE, [], []).
stop(Pid) ->
gen_server:cast(Pid, stop).
download(Pid, MagHash) when is_list(MagHash), length(MagHash) == 40 ->
gen_server:cast(Pid, {download, MagHash, self()}).
%{HashSum, ReqSum, TotalTime, CurrentReqCount}
stats(Pid) ->
gen_server:call(Pid, get_stats, infinity).
init([]) ->
{ok, #state{start = now(), reqs = gb_trees:empty()}}.
handle_cast({download, MagHash, From}, State) ->
#state{reqs = Reqs, hashSum = H, reqSum = R} = State,
NewReqs = create_download(Reqs, MagHash, From),
{noreply, State#state{reqs = NewReqs, hashSum = H + 1, reqSum = R + 1}};
handle_cast(stop, State) ->
{stop, normal, State};
handle_cast(_, State) ->
{noreply, State}.
terminate(_, State) ->
{ok, State}.
code_change(_, _, State) ->
{ok, State}.
handle_response(State, ReqID, Body) ->
#state{reqSum = R, totalTime = T, reqs = Reqs} = State,
{MagHash, URLs, From, Start} = gb_trees:get(ReqID, Reqs),
NewT = T + timer:now_diff(now(), Start) div 1000, % mill-seconds
{NewReqS, NewReqs} = case unzip_content(Body) of
error ->
handle_next_req(MagHash, URLs, From, R, ReqID, Reqs);
Content ->
{R, handle_ok_response(MagHash, Content, From, ReqID, Reqs)}
end,
State#state{reqSum = NewReqS, reqs = NewReqs, totalTime = NewT}.
handle_info({ibrowse_async_response, ReqID, Body}, State) ->
#state{reqs = Reqs} = State,
NewState = case gb_trees:is_defined(ReqID, Reqs) of
true ->
handle_response(State, ReqID, Body);
false ->
?E(?FMT("not found req ~p , reqs count ~p", [ReqID, gb_trees:size(Reqs)])),
State
end,
{noreply, NewState};
handle_info(_, State) ->
{noreply, State}.
handle_call(get_stats, _From, State) ->
#state{hashSum = H, reqSum = R, totalTime = T, reqs = Reqs} = State,
{reply, {H, R, T, gb_trees:size(Reqs)}, State};
handle_call(_, _From, State) ->
{noreply, State}.
%%
handle_next_req(MagHash, URLs, From, ReqSum, ReqID, Reqs) ->
DelReqs = gb_trees:delete(ReqID, Reqs),
case request_next(URLs) of
{error, empty} ->
From ! {got_torrent, failed, MagHash},
{ReqSum, DelReqs};
{ok, NewReqID, NewURLs, Time} ->
NewReq = {MagHash, NewURLs, From, Time},
{ReqSum + 1, gb_trees:insert(NewReqID, NewReq, DelReqs)}
end.
handle_ok_response(MagHash, Content, From, ReqID, Reqs) ->
From ! {got_torrent, ok, MagHash, Content},
gb_trees:delete(ReqID, Reqs).
create_download(Reqs, MagHash, From) ->
URLs = create_req_urls(MagHash),
case request_next(URLs) of
{ok, ReqID, NewURLs, Time} ->
Req = {MagHash, NewURLs, From, Time},
gb_trees:insert(ReqID, Req, Reqs);
{error, empty} -> % exception
From ! {got_torrent, failed, MagHash},
Reqs
end.
request_next([]) ->
{error, empty};
request_next([URL|T]) ->
SSL = is_ssl_url(URL),
Options = [{is_ssl, SSL}, {ssl_options, []}, {stream_to, self()},
{max_sessions, ?HTTP_SESSION}, {max_pipeline_size, ?HTTP_PIPELINE}],
case ibrowse:send_req(URL, [], get, [], Options, ?REQ_TIMEOUT) of
{ibrowse_req_id, ReqID} ->
{ok, ReqID, T, now()};
Reason ->
?E(?FMT("ibrowse send_req failed ~p", [Reason])),
request_next(T)
end.
%%
unzip_content(B) when is_list(B) ->
unzip_content(list_to_binary(B));
unzip_content(B) when is_binary(B), byte_size(B) > 0 ->
case catch(zlib:gunzip(B)) of
{'EXIT', _} -> error;
Res -> Res
end;
unzip_content(_B) ->
error.
%% http stuff
get_req_hosts() ->
["http://bt.box.n0808.com",
"http://torcache.net",
"http://zoink.it"].
create_req_urls(MagHash) when is_list(MagHash), length(MagHash) == 40 ->
U1 = "http://torcache.net/torrent/" ++ MagHash ++ ".torrent",
U2 = format_btbox_url(MagHash),
% zoink.it support https, but the ssl library seems memory leak
U3 = "http://zoink.it/torrent/" ++ MagHash ++ ".torrent",
[U1, U2, U3].
is_ssl_url(URL) when is_list(URL), length(URL) > 4 ->
string:substr(URL, 1, 5) == "https".
format_btbox_url(MagHash) ->
H = lists:sublist(MagHash, 2),
T = lists:nthtail(38, MagHash),
"http://bt.box.n0808.com/" ++ H ++ "/" ++ T ++ "/" ++ MagHash ++ ".torrent".
%%
test(Pid, MagHash) ->
tor_download:download(Pid, MagHash),
filelib:ensure_dir("torrents/"),
Name = "torrents/" ++ MagHash ++ ".torrent",
receive
{got_torrent, ok, _, Content} ->
file:write_file(Name, Content);
{got_torrent, failed, _} ->
failed
after 8000 ->
timeout
end.

@ -0,0 +1,70 @@
%%
%% tor_download_stats.erl
%% Kevin Lynx
%% 06.30.2013
%%
-module(tor_download_stats).
-behaviour(gen_server).
-export([init/1,
handle_call/3,
handle_cast/2,
handle_info/2,
terminate/2,
code_change/3]).
-export([start_link/0,
stop/0,
register/1,
stats/0]).
-record(state, {pids = []}).
start_link() ->
gen_server:start_link({local, srv_name()}, ?MODULE, [], []).
stop() ->
gen_server:cast(srv_name(), stop).
% {ProcessCount, HashSum, ReqSum, TotalTime, CurrentReqCount}
stats() ->
gen_server:call(srv_name(), stats, infinity).
register(Pid) ->
gen_server:cast(srv_name(), {register, Pid}).
srv_name() ->
?MODULE.
%%
init([]) ->
{ok, #state{}}.
terminate(_, State) ->
{ok, State}.
code_change(_, _, State) ->
{ok, State}.
handle_info(_, State) ->
{noreply, State}.
handle_cast({register, Pid}, State) ->
#state{pids = Pids} = State,
{noreply, State#state{pids = [Pid|Pids]}};
handle_cast(stop, State) ->
{stop, normal, State}.
handle_call(stats, _From, State) ->
#state{pids = Pids} = State,
{reply, stats(Pids), State};
handle_call(_, _From, State) ->
{noreply, State}.
stats(Pids) ->
% {ProcessCount, HashSum, ReqSum, TotalTime, CurrentReqCount}
Info = {0, 0, 0, 0, 0},
lists:foldl(fun(Pid, {PSum, HashSum, ReqSum, TimeSum, CurReqCount}) ->
{ThisH, ThisReq, ThisTime, ThisCurReq} = tor_download:stats(Pid),
{PSum + 1, HashSum + ThisH, ReqSum + ThisReq, TimeSum + ThisTime, CurReqCount + ThisCurReq}
end, Info, Pids).

@ -0,0 +1,94 @@
%%
%% torrent_file.erl
%% Kevin Lynx
%% 06.12.2013
%% parse torrent torrent_file, it's dependent on bencode module.
%%
-module(torrent_file).
-export([parse_from_file/1,
parse/1,
size_brief/1,
size_string/1]).
parse_from_file(File) ->
{ok, Data} = file:read_file(File),
parse(Data).
% Format:
% {single, {name, length}}
% {multi, {root, [{path, length}, {path, length}]}}
parse(Content) ->
{ok, {dict, TD}} = bencode:decode(Content),
{ok, {dict, Info}} = dict:find(<<"info">>, TD),
case type(Info) of
single -> {single, parse_single(Info)};
multi -> {multi, parse_multi(Info)}
end.
size_brief(SizeInBytes) ->
BT = 1024,
KT = BT * 1024,
MT = KT * 1024,
GT = MT * 1024,
if
SizeInBytes < BT -> {byte, SizeInBytes};
SizeInBytes < KT -> {kb, SizeInBytes div BT};
SizeInBytes < MT -> {mb, SizeInBytes div KT};
SizeInBytes < GT -> {gb, SizeInBytes div MT};
true -> {byte, SizeInBytes}
end.
size_string(SizeInBytes) ->
{T, N} = size_brief(SizeInBytes),
lists:flatten(io_lib:format("~b ~s", [N, atom_to_list(T)])).
type(Info) ->
case dict:find(<<"files">>, Info) of
{ok, {list, _Files}} -> multi;
_ -> single
end.
parse_single(Info) ->
Name = read_string("name", Info),
{ok, Length} = dict:find(<<"length">>, Info),
{Name, Length}.
parse_multi(Info) ->
Root = read_string("name", Info),
{ok, {list, Files}} = dict:find(<<"files">>, Info),
FileInfo = [parse_file_item(Item) || {dict, Item} <- Files],
{Root, FileInfo}.
parse_file_item(F) ->
{ok, Length} = dict:find(<<"length">>, F),
Path = read_path(F),
{Path, Length}.
read_string(Key, Dict) ->
% prefer utf8
case dict:find(list_to_binary(Key++".utf-8"), Dict) of
{ok, UTF8BS} ->
binary_to_list(UTF8BS);
_ ->
{ok, BS} = dict:find(list_to_binary(Key), Dict),
binary_to_list(BS)
end.
read_path(F) ->
case dict:find(<<"path.utf-8">>, F) of
{ok, {list, Paths}} ->
concat_path(Paths);
_ ->
{ok, {list, Paths}} = dict:find(<<"path">>, F),
concat_path(Paths)
end.
concat_path(Paths) ->
AppendSlash = fun(BS, Acc) ->
case Acc of
[] -> binary_to_list(BS);
_ -> Acc ++ "/" ++ binary_to_list(BS)
end
end,
lists:foldl(AppendSlash, "", Paths).

@ -0,0 +1,18 @@
## dhtcrawler2
This git branch maintain pre-compiled erlang files to start dhtcrawler2 directly.
## Usage
* install Erlang R16B or newer
* download mongodb and start mongodb first
mongod --dbpath your-database-path --setParameter textSearchEnabled=true
* start **crawler**, on Windows, just click `win_start_crawler.bat`
* start **hash_reader**, on Windows, just click `win_start_hash.bat`
* start **httpd**, on Windows, just click `win_start_http.bat`
* wait several minutes and checkout `localhost:8000`

@ -0,0 +1,16 @@
cd ..
mkdir bin\deps\bson\ebin
mkdir bin\deps\mongodb\ebin
mkdir bin\deps\kdht\ebin
mkdir bin\deps\ibrowse\ebin
copy deps\bson\ebin\*.* bin\deps\bson\ebin\
copy deps\mongodb\ebin\*.* bin\deps\mongodb\ebin\
copy deps\kdht\ebin\*.* bin\deps\kdht\ebin\
copy deps\ibrowse\ebin\*.* bin\deps\ibrowse\ebin\
mkdir bin\www
copy www\*.* bin\www\
copy tools\*.* bin\
mkdir bin\priv
mkdir bin\ebin
copy ebin\*.* bin\ebin\
pause

@ -0,0 +1,2 @@
erl -pa ebin -noshell -s crawler_app start

@ -0,0 +1 @@
erl -pa ebin -noshell -s db_hash_reader_sup start_standalone localhost 27017 10

@ -0,0 +1,2 @@
erl -pa ebin -noshell -s crawler_http start

@ -0,0 +1,66 @@
<!DOCTYPE html>
<html>
<head>
<meta charset="utf-8">
<title>Yet Another Magnet Search Engine</title>
<style type="text/css" media="screen">
body { margin:0; padding:0; }
#wrap {
width: 980px;
margin: 5px auto;
}
#header {
height: 30px;
}
#main {
margin: 20px 10px;
}
#content {
min-height: 130px;
padding: 20px 5px;
}
#footer{
text-align:center;
font-size: 13px;
margin-top: 20px;
padding-top: 5px;
border-top: 1px solid #888;
}
.search-box #s_text {
width: 200px;
}
</style>
</head>
<body>
<div id="wrap">
<div id="header">
<h2><a style="text-decoration:none;" href="/">Magnet Search Engine</a></h2>
</div>
<div id="main">
<div class="search-box">
<form id="s_box" method="get" action="/e/http_handler:search" >
<input id="s_text" name="q" type="text" />
<input id="s_submit" value="Search" type="submit" />
</form>
</div>
<div id="content">
<p>
Try <a href="/e/http_handler:search?q=avi">AVI</a>
&nbsp;<a href="/e/http_handler:search?q=american">American</a>
&nbsp;<a href="/e/http_handler:search?q=iron+man">Iron Man</a>
</p>
</div>
<div id="footer">
<a href="/e/http_handler:recent">New</a>
<a href="/e/http_handler:top">Top</a>
<a href="/e/http_handler:stats">Stats</a>
<span> &copy; Kevin Lynx 2013</span>
</div>
<script type="text/javascript">
document.getElementsByTagName('input')[0].focus();
</script>
</div>
</div>
</body>
</html>

@ -0,0 +1,59 @@
<!DOCTYPE html>
<html>
<head>
<meta charset="utf-8">
<title>Yet Another Magnet Search Engine</title>
<style type="text/css" media="screen">
body { margin:0; padding:0; }
#wrap {
width: 980px;
margin: 5px auto;
}
#header {
height: 30px;
}
#main {
margin: 20px 10px;
}
#content {
min-height: 130px;
padding: 20px 5px;
}
#footer{
text-align:center;
font-size: 13px;
margin-top: 20px;
padding-top: 5px;
border-top: 1px solid #888;
}
.search-box #s_text {
width: 200px;
}
</style>
</head>
<body>
<div id="wrap">
<div id="header">
<h2><a style="text-decoration:none;" href="/">Magnet Search Engine</a></h2>
</div>
<div id="main">
<div class="search-box">
<form id="s_box" method="get" action="/e/http_handler:search" >
<input id="s_text" value="~s" name="q" type="text" />
<input id="s_submit" value="Search" type="submit" />
</form>
</div>
<div id="content">
~s
</div>
<div id="footer">
<a href="/e/http_handler:recent">New</a>
<a href="/e/http_handler:top">Top</a>
<a href="/e/http_handler:stats">Stats</a>
<span> &copy; Kevin Lynx 2013</span>
</div>
</div>
</div>
</body>
</html>
Loading…
Cancel
Save