From a0be5160dc8d744df03735ef56fe921e76042c10 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Alo=C3=AFs=20Micard?= Date: Mon, 21 Sep 2020 16:40:12 +0200 Subject: [PATCH] Start implementing new architecture --- .dockerignore | 8 ++ ...kerfile-persister => Dockerfile-extractor} | 6 +- .../tdsh-extractor.go} | 4 +- deployments/docker/docker-compose.yml | 4 +- docs/architecture.md | 45 ++++++++ go.mod | 1 + go.sum | 54 +++++++++ internal/api/api.go | 108 +++++------------- internal/crawler/crawler.go | 23 +--- .../persister.go => extractor/extractor.go} | 35 ++---- internal/extractor/extractor_test.go | 1 + internal/scheduler/scheduler.go | 2 +- internal/util/nats/message.go | 5 + internal/util/nats/nats.go | 8 ++ pkg/proto/proto.go | 23 +++- scripts/build.sh | 8 +- 16 files changed, 200 insertions(+), 135 deletions(-) create mode 100644 .dockerignore rename build/docker/{Dockerfile-persister => Dockerfile-extractor} (69%) rename cmd/{tdsh-persister/tdsh-persister.go => tdsh-extractor/tdsh-extractor.go} (56%) create mode 100644 docs/architecture.md rename internal/{persister/persister.go => extractor/extractor.go} (58%) create mode 100644 internal/extractor/extractor_test.go create mode 100644 internal/util/nats/message.go diff --git a/.dockerignore b/.dockerignore new file mode 100644 index 0000000..2b632c8 --- /dev/null +++ b/.dockerignore @@ -0,0 +1,8 @@ +.git +LICENSE +README.md +snapcraft.yaml +deployments +docs +scripts +build diff --git a/build/docker/Dockerfile-persister b/build/docker/Dockerfile-extractor similarity index 69% rename from build/docker/Dockerfile-persister rename to build/docker/Dockerfile-extractor index 4c26406..74f4774 100644 --- a/build/docker/Dockerfile-persister +++ b/build/docker/Dockerfile-extractor @@ -13,12 +13,12 @@ RUN go mod download COPY . . # Test then build app -RUN go build -v github.com/creekorful/trandoshan/cmd/tdsh-persister +RUN go build -v github.com/creekorful/trandoshan/cmd/tdsh-extractor # runtime image FROM alpine:latest -COPY --from=builder /app/tdsh-persister /app/ +COPY --from=builder /app/tdsh-extractor /app/ WORKDIR /app/ -ENTRYPOINT ["./tdsh-persister"] \ No newline at end of file +ENTRYPOINT ["./tdsh-extractor"] \ No newline at end of file diff --git a/cmd/tdsh-persister/tdsh-persister.go b/cmd/tdsh-extractor/tdsh-extractor.go similarity index 56% rename from cmd/tdsh-persister/tdsh-persister.go rename to cmd/tdsh-extractor/tdsh-extractor.go index 0482544..624d398 100644 --- a/cmd/tdsh-persister/tdsh-persister.go +++ b/cmd/tdsh-extractor/tdsh-extractor.go @@ -1,12 +1,12 @@ package main import ( - "github.com/creekorful/trandoshan/internal/persister" + "github.com/creekorful/trandoshan/internal/extractor" "os" ) func main() { - app := persister.GetApp() + app := extractor.GetApp() if err := app.Run(os.Args); err != nil { os.Exit(1) } diff --git a/deployments/docker/docker-compose.yml b/deployments/docker/docker-compose.yml index f0c7766..5a0f6c5 100644 --- a/deployments/docker/docker-compose.yml +++ b/deployments/docker/docker-compose.yml @@ -39,8 +39,8 @@ services: restart: always depends_on: - nats - persister: - image: creekorful/tdsh-persister:latest + extractor: + image: creekorful/tdsh-extractor:latest command: --log-level debug --nats-uri nats --api-uri http://api:8080 restart: always depends_on: diff --git a/docs/architecture.md b/docs/architecture.md new file mode 100644 index 0000000..524f98c --- /dev/null +++ b/docs/architecture.md @@ -0,0 +1,45 @@ +# Crawler + +The crawler is the central process of Trandoshan. +It consumes URL, crawl them and publish the page body while following redirects etc... + +## Consumes + +- URL (url.todo) + +## Produces + +- Resource (resource.new) + +# Extractor + +The extractor is the data extraction process of Trandoshan. +It consumes crawled resource, extract data (urls, metadata, etc...) from it, +store them into an ES instance (by calling the API), & publish found URLs. + +## Consumes + +- Resource (resource.new) + +## Produces + +- URL (url.found) +- Metadata +- Body + +# Scheduler + +The scheduler is the process responsible for crawling schedule part. +It determinates which URL should be crawled and publish them. + +## Consumes + +- URL (url.found) + +## Produces + +- URL (url.todo) + +# API + +The API process is mainly used to get data from ES. \ No newline at end of file diff --git a/go.mod b/go.mod index 4c30554..367bc36 100644 --- a/go.mod +++ b/go.mod @@ -10,6 +10,7 @@ require ( github.com/labstack/echo/v4 v4.1.16 github.com/nats-io/nats-server/v2 v2.1.8 // indirect github.com/nats-io/nats.go v1.10.0 + github.com/olivere/elastic/v7 v7.0.20 github.com/rs/zerolog v1.20.0 github.com/urfave/cli/v2 v2.2.0 github.com/valyala/fasthttp v1.9.0 diff --git a/go.sum b/go.sum index bb338f8..e977538 100644 --- a/go.sum +++ b/go.sum @@ -1,8 +1,11 @@ +cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw= github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= github.com/PuerkitoBio/purell v1.1.1 h1:WEQqlqaGbrPkxLJWfBwQmfEAE1Z7ONdDLqrN38tNFfI= github.com/PuerkitoBio/purell v1.1.1/go.mod h1:c11w/QuzBsJSee3cPx9rAFu61PvFxuPbtSwDGJws/X0= github.com/PuerkitoBio/urlesc v0.0.0-20170810143723-de5bf2ad4578 h1:d+Bc7a5rLufV/sSk/8dngufqelfh6jnri85riMAaF/M= github.com/PuerkitoBio/urlesc v0.0.0-20170810143723-de5bf2ad4578/go.mod h1:uGdkoq3SwY9Y+13GIhn11/XLaGBb4BfwItxLd5jeuXE= +github.com/aws/aws-sdk-go v1.34.13/go.mod h1:5zCpMtNQVjRREroY7sYe8lOMRSxkhG6MZveU8YkpAk0= +github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw= github.com/coreos/go-systemd v0.0.0-20190321100706-95778dfbb74e/go.mod h1:F5haX7vjVVG0kc13fIWeqUViNPyEJxv/OmvnBo0Yme4= github.com/cpuguy83/go-md2man/v2 v2.0.0-20190314233015-f79a8a8ca69d h1:U+s90UTSYgptZMwQh2aRr3LuazLJIa+Pg3Kc1ylSYVY= github.com/cpuguy83/go-md2man/v2 v2.0.0-20190314233015-f79a8a8ca69d/go.mod h1:maD7wRr/U5Z6m/iR4s+kqSMx2CaBsrgA7czyZG/E6dU= @@ -11,6 +14,13 @@ github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSs github.com/dgrijalva/jwt-go v3.2.0+incompatible/go.mod h1:E3ru+11k8xSBh+hMPgOLZmtrrCbhqsmaPHjLKYnJCaQ= github.com/elastic/go-elasticsearch/v7 v7.6.0 h1:sYpGLpEFHgLUKLsZUBfuaVI9QgHjS3JdH9fX4/z8QI8= github.com/elastic/go-elasticsearch/v7 v7.6.0/go.mod h1:OJ4wdbtDNk5g503kvlHLyErCgQwwzmDtaFC4XyOxXA4= +github.com/fortytw2/leaktest v1.3.0/go.mod h1:jDsjWgpAGjm2CA7WthBh/CdZYEPF31XHquHwclZch5g= +github.com/go-sql-driver/mysql v1.5.0/go.mod h1:DCzpHaOWr8IXmIStZouvnhqoel9Qv2LBy8hT2VhHyBg= +github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q= +github.com/golang/groupcache v0.0.0-20190702054246-869f871628b6/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc= +github.com/golang/mock v1.1.1/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A= +github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= +github.com/golang/protobuf v1.3.1/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= github.com/golang/protobuf v1.4.0-rc.1/go.mod h1:ceaxUfeHdC40wWswd/P6IGgMaK3YpKi5j83Wpe3EHw8= github.com/golang/protobuf v1.4.0-rc.1.0.20200221234624-67d41d38c208/go.mod h1:xKAWHe0F5eneWXFV3EuXVDTCmh+JuBKY0li0aMyXATA= github.com/golang/protobuf v1.4.0-rc.2/go.mod h1:LlEzMj4AhA7rCAGe4KMBDvJI+AwstrUpVNzEA03Pprs= @@ -22,6 +32,10 @@ github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMyw github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= github.com/google/go-cmp v0.4.0 h1:xsAVV57WRhGj6kEIi8ReJzQlHHqcBYCElAvkovg3B/4= github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/google/go-cmp v0.5.2/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/jmespath/go-jmespath v0.3.0/go.mod h1:9QtRXoHjLGCJ5IBSaohpXITPlowMeeYCZ7fLUTSywik= +github.com/josharian/intern v1.0.0 h1:vlS4z54oSdjm0bgjRigI+G1HpF+tI+9rE5LLzOg8HmY= +github.com/josharian/intern v1.0.0/go.mod h1:5DoeVV0s6jJacbCEi61lwdGj/aVlrQvzHFFd8Hwg//Y= github.com/klauspost/compress v1.8.2 h1:Bx0qjetmNjdFXASH02NSAREKpiaDwkO1DRZ3dV2KCcs= github.com/klauspost/compress v1.8.2/go.mod h1:RyIbtBH6LamlWaDj8nUwkbUhJ87Yi3uG0guNDohfE1A= github.com/klauspost/cpuid v1.2.1 h1:vJi+O/nMdFt0vqm8NZBI6wzALWdA2X+egi0ogNyrC/w= @@ -30,6 +44,8 @@ github.com/labstack/echo/v4 v4.1.16 h1:8swiwjE5Jkai3RPfZoahp8kjVCRNq+y7Q0hPji2Kz github.com/labstack/echo/v4 v4.1.16/go.mod h1:awO+5TzAjvL8XpibdsfXxPgHr+orhtXZJZIQCVjogKI= github.com/labstack/gommon v0.3.0 h1:JEeO0bvc78PKdyHxloTKiF8BD5iGrH8T6MSeGvSgob0= github.com/labstack/gommon v0.3.0/go.mod h1:MULnywXg0yavhxWKc+lOruYdAhDwPK9wf0OL7NoOu+k= +github.com/mailru/easyjson v0.7.6 h1:8yTIVnZgCoiM1TgqoeTl+LfU5Jg6/xL3QhGQnimLYnA= +github.com/mailru/easyjson v0.7.6/go.mod h1:xzfreul335JAWq5oZzymOObrkdz5UnU4kGfJJLY9Nlc= github.com/mattn/go-colorable v0.1.2/go.mod h1:U0ppj6V5qS13XJ6of8GYAs25YV2eR4EVcfRqFIhoBtE= github.com/mattn/go-colorable v0.1.6 h1:6Su7aK7lXmJ/U79bYtBjLNaha4Fs1Rg9plHpcH+vvnE= github.com/mattn/go-colorable v0.1.6/go.mod h1:u6P/XSegPjTcexA+o6vUJrdnUu04hMope9wVRipJSqc= @@ -48,7 +64,14 @@ github.com/nats-io/nkeys v0.1.4 h1:aEsHIssIk6ETN5m2/MD8Y4B2X7FfXrBAUdkyRvbVYzA= github.com/nats-io/nkeys v0.1.4/go.mod h1:XdZpAbhgyyODYqjTawOnIOI7VlbKSarI9Gfy1tqEu/s= github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw= github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c= +github.com/olivere/elastic v1.0.1 h1:UeafjZg+TifCVPhCJNPof0pUHig6vbXuJEbC/A+Ouo0= +github.com/olivere/elastic v6.2.35+incompatible h1:MMklYDy2ySi01s123CB2WLBuDMzFX4qhFcA5tKWJPgM= +github.com/olivere/elastic/v7 v7.0.20 h1:5FFpGPVJlBSlWBOdict406Y3yNTIpVpAiUvdFZeSbAo= +github.com/olivere/elastic/v7 v7.0.20/go.mod h1:Kh7iIsXIBl5qRQOBFoylCsXVTtye3keQU2Y/YbR7HD8= +github.com/opentracing/opentracing-go v1.2.0/go.mod h1:GxEUsuufX4nBwe+T+Wl9TAgYrxe9dPLANfrWvHYVTgc= github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= +github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= +github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/rs/xid v1.2.1/go.mod h1:+uKXf+4Djp6Md1KODXJxgGQPKngRmWyn10oCKFzNHOQ= @@ -58,9 +81,14 @@ github.com/russross/blackfriday/v2 v2.0.1 h1:lPqVAte+HuHNfhJ/0LC98ESWRz8afy9tM/0 github.com/russross/blackfriday/v2 v2.0.1/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM= github.com/shurcooL/sanitized_anchor_name v1.0.0 h1:PdmoCO6wvbs+7yrJyMORt4/BmY5IYyJwS/kOiWx8mHo= github.com/shurcooL/sanitized_anchor_name v1.0.0/go.mod h1:1NzhyTcUVG4SuEtjjoZeVRXNmyL/1OwPU0+IJeTBvfc= +github.com/smartystreets/assertions v1.1.1/go.mod h1:tcbTF8ujkAEcZ8TElKY+i30BzYlVhC/LOxJk7iOWnoo= +github.com/smartystreets/go-aws-auth v0.0.0-20180515143844-0c1422d1fdb9/go.mod h1:SnhjPscd9TpLiy1LpzGSKh3bXCfxxXuqd9xmQJy3slM= +github.com/smartystreets/gunit v1.4.2/go.mod h1:ZjM1ozSIMJlAz/ay4SG8PeKF00ckUp+zMHZXV9/bvak= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= github.com/stretchr/testify v1.4.0 h1:2E4SXV/wtOkTonXsotYi4li6zVWxYlZuYNCXe9XRJyk= github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= +github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA= github.com/urfave/cli/v2 v2.2.0 h1:JTTnM6wKzdA0Jqodd966MVj4vWbbquZykeX1sKbe2C4= github.com/urfave/cli/v2 v2.2.0/go.mod h1:SE9GqnLQmjVa0iPEY0f1w3ygNIYcIJ0OKPMoW2caLfQ= github.com/valyala/bytebufferpool v1.0.0 h1:GqA5TC/0021Y/b9FG4Oi9Mr3q7XYx6KllzawFIhcdPw= @@ -71,21 +99,37 @@ github.com/valyala/fasttemplate v1.0.1/go.mod h1:UQGH1tvbgY+Nz5t2n7tXsz52dQxojPU github.com/valyala/fasttemplate v1.1.0 h1:RZqt0yGBsps8NGvLSGW804QQqCUYYLsaOjTVHy1Ocw4= github.com/valyala/fasttemplate v1.1.0/go.mod h1:UQGH1tvbgY+Nz5t2n7tXsz52dQxojPUpymEIMZ47gx8= github.com/valyala/tcplisten v0.0.0-20161114210144-ceec8f93295a/go.mod h1:v3UYOV9WzVtRmSR+PDvWpU/qWl4Wa5LApYYX4ZtKbio= +go.opencensus.io v0.22.4/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20190701094942-4def268fd1a4/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20200221231518-2aa609cf4a9d/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/crypto v0.0.0-20200323165209-0ec3e9974c59 h1:3zb4D3T4G8jdExgVU/95+vQXfpEPiMdCaZgmGVxjNHM= golang.org/x/crypto v0.0.0-20200323165209-0ec3e9974c59/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= +golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= +golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE= +golang.org/x/lint v0.0.0-20190227174305-5b3e6a55c961/go.mod h1:wehouNa3lNwaWXcvxsM5YxQ5yQlVC4a0KAMCusXpPoU= +golang.org/x/lint v0.0.0-20190313153728-d0100b6bd8b3/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc= +golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= +golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= +golang.org/x/net v0.0.0-20190213061140-3a22650c66bd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= +golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20190827160401-ba9fcec4b297 h1:k7pJ2yAPLPgbskkFdhRCsA77k2fySZ1zf2zCjvQCiIM= golang.org/x/net v0.0.0-20190827160401-ba9fcec4b297/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/net v0.0.0-20200202094626-16171245cfb2/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20200226121028-0de0cce0169b h1:0mm1VjtFUOIlE1SbDlwjYaDxZVDP2S5ou6y0gSgXHu8= golang.org/x/net v0.0.0-20200226121028-0de0cce0169b/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= +golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20190227155943-e225da77a7e6/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190222072716-a9d3bda3a223/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20190502145724-3ef323f4f1fd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20190726091711-fc99dfbffb4e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20190813064441-fde4db37ae7a/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200116001909-b77594299b42/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= @@ -96,10 +140,19 @@ golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.2 h1:tW2bmiBqwgJj/UpqtC8EpXEZVYOwU0yG4iWbprSVAcs= golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= +golang.org/x/tools v0.0.0-20190114222345-bf090417da8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= +golang.org/x/tools v0.0.0-20190226205152-f727befe758c/go.mod h1:9Yl7xja0Znq3iFh3HoIrodX9oNMXvdceNzlUR8zjMvY= +golang.org/x/tools v0.0.0-20190311212946-11955173bddd/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs= golang.org/x/tools v0.0.0-20190828213141-aed303cbaa74/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 h1:E7g+9GITq07hpfrRu66IVDexMakfv52eLZ2CXBWiKr4= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +google.golang.org/appengine v1.1.0/go.mod h1:EbEs0AVv82hx2wNQdGPgUI5lhzA/G0D9YwlJXL52JkM= +google.golang.org/appengine v1.4.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4= +google.golang.org/genproto v0.0.0-20180817151627-c66870c02cf8/go.mod h1:JiN7NxoALGmiZfu7CAH4rXhgtRTLTxftemlI0sWmxmc= +google.golang.org/genproto v0.0.0-20190425155659-357c62f0e4bb/go.mod h1:VzzqZJRnGkLBvHegQrXjBqPurQTc5/KpmUdxsrq26oE= +google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c= +google.golang.org/grpc v1.20.1/go.mod h1:10oTOabMzJvdu6/UiuZezV6QK5dSlG84ov/aaiqXj38= google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8= google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0= google.golang.org/protobuf v0.0.0-20200228230310-ab0ca4ff8a60/go.mod h1:cfTl7dwQJ+fmap5saPgwCLgHXTUD7jkjRqWcaiX5VyM= @@ -112,5 +165,6 @@ gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+ gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/yaml.v2 v2.2.2 h1:ZCJp+EgiOT7lHqUV2J862kp8Qj64Jo6az82+3Td9dZw= gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= +honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= mvdan.cc/xurls/v2 v2.1.0 h1:KaMb5GLhlcSX+e+qhbRJODnUUBvlw01jt4yrjFIHAuA= mvdan.cc/xurls/v2 v2.1.0/go.mod h1:5GrSd9rOnKOpZaji1OZLYL/yeAAtGDlo/cFe+8K5n8E= diff --git a/internal/api/api.go b/internal/api/api.go index f61cb5c..11fd4af 100644 --- a/internal/api/api.go +++ b/internal/api/api.go @@ -1,17 +1,15 @@ package api import ( - "bytes" "context" "encoding/base64" "encoding/json" "github.com/creekorful/trandoshan/internal/util/logging" natsutil "github.com/creekorful/trandoshan/internal/util/nats" "github.com/creekorful/trandoshan/pkg/proto" - "github.com/elastic/go-elasticsearch/v7" - "github.com/elastic/go-elasticsearch/v7/esapi" "github.com/labstack/echo/v4" "github.com/nats-io/nats.go" + "github.com/olivere/elastic/v7" "github.com/rs/zerolog/log" "github.com/urfave/cli/v2" "net/http" @@ -75,7 +73,10 @@ func execute(ctx *cli.Context) error { defer nc.Close() // Create Elasticsearch client - es, err := elasticsearch.NewClient(elasticsearch.Config{Addresses: []string{ctx.String("elasticsearch-uri")}}) + es, err := elastic.NewClient( + elastic.SetURL(ctx.String("elasticsearch-uri")), + elastic.SetHealthcheck(false), + ) if err != nil { log.Err(err).Msg("Error while creating ES client") return err @@ -91,8 +92,9 @@ func execute(ctx *cli.Context) error { return e.Start(":8080") } -func searchResources(es *elasticsearch.Client) echo.HandlerFunc { +func searchResources(es *elastic.Client) echo.HandlerFunc { return func(c echo.Context) error { + // First of all base64decode the URL b64URL := c.QueryParam("url") b, err := base64.URLEncoding.DecodeString(b64URL) if err != nil { @@ -100,69 +102,32 @@ func searchResources(es *elasticsearch.Client) echo.HandlerFunc { return c.NoContent(http.StatusInternalServerError) } - var buf bytes.Buffer - query := map[string]interface{}{ - "query": map[string]interface{}{ - "match": map[string]interface{}{ - "url": string(b), - }, - }, - } - if err := json.NewEncoder(&buf).Encode(query); err != nil { - log.Err(err).Msg("Error encoding query") - return c.NoContent(http.StatusInternalServerError) - } - // Perform the search request. - res, err := es.Search( - es.Search.WithContext(context.Background()), - es.Search.WithIndex("resources"), - es.Search.WithBody(&buf), - ) - if err != nil || (res.IsError() && res.StatusCode != http.StatusNotFound) { - evt := log.Err(err) - if res != nil { - evt.Int("status", res.StatusCode) - } - evt.Msg("Error getting response from ES") - - return c.NoContent(http.StatusInternalServerError) - } - - // In case the collection does not already exist - // ES will return 404 NOT FOUND - if res.StatusCode == http.StatusNotFound { - return c.JSON(http.StatusOK, []proto.ResourceDto{}) - } - - var resp map[string]interface{} - if err := json.NewDecoder(res.Body).Decode(&resp); err != nil { - log.Err(err).Msg("Error parsing the response body from ES") + query := elastic.NewMatchQuery("url", string(b)) + res, err := es.Search(). + Index("resource"). + Query(query). + Do(context.Background()) + if err != nil { + log.Err(err).Msg("Error while searching on ES") return c.NoContent(http.StatusInternalServerError) } - var urls []proto.ResourceDto - for _, rawHit := range resp["hits"].(map[string]interface{})["hits"].([]interface{}) { - rawSrc := rawHit.(map[string]interface{})["_source"].(map[string]interface{}) - - res := proto.ResourceDto{ - URL: rawSrc["url"].(string), - Title: rawSrc["title"].(string), - } - - t, err := time.Parse(time.RFC3339, rawSrc["time"].(string)) - if err == nil { - res.Time = t + var resources []proto.ResourceDto + for _, hit := range res.Hits.Hits { + var resource proto.ResourceDto + if err := json.Unmarshal(hit.Source, &resource); err != nil { + log.Warn().Str("err", err.Error()).Msg("Error while un-marshaling resource") + continue } - - urls = append(urls, res) + resources = append(resources, resource) } - return c.JSON(http.StatusOK, urls) + return c.JSON(http.StatusOK, resources) } } -func addResource(es *elasticsearch.Client) echo.HandlerFunc { +func addResource(es *elastic.Client) echo.HandlerFunc { return func(c echo.Context) error { var resourceDto proto.ResourceDto if err := json.NewDecoder(c.Request().Body).Decode(&resourceDto); err != nil { @@ -172,8 +137,6 @@ func addResource(es *elasticsearch.Client) echo.HandlerFunc { log.Debug().Str("url", resourceDto.URL).Msg("Saving resource") - // TODO store on file system - // Create Elasticsearch document doc := resourceIndex{ URL: protocolRegex.ReplaceAllLiteralString(resourceDto.URL, ""), @@ -182,25 +145,14 @@ func addResource(es *elasticsearch.Client) echo.HandlerFunc { Time: time.Now(), } - // Serialize document into json - docBytes, err := json.Marshal(&doc) + _, err := es.Index(). + Index("resources"). + BodyJson(doc). + Do(context.Background()) if err != nil { - log.Err(err).Msg("Error while serializing document into json") - return c.NoContent(http.StatusInternalServerError) - } - - // Use Elasticsearch to index document - req := esapi.IndexRequest{ - Index: "resources", - Body: bytes.NewReader(docBytes), - Refresh: "true", - } - res, err := req.Do(context.Background(), es) - if err != nil { - log.Err(err).Msg("Error while creating elasticsearch index") - return c.NoContent(http.StatusInternalServerError) + log.Err(err).Msg("Error while creating ES document") + return err } - defer res.Body.Close() log.Debug().Str("url", resourceDto.URL).Msg("Successfully saved resource") @@ -217,7 +169,7 @@ func addURL(nc *nats.Conn) echo.HandlerFunc { } // Publish the URL - if err := natsutil.PublishJSON(nc, proto.URLFoundSubject, &proto.URLFoundMsg{URL: url}); err != nil { + if err := natsutil.PublishMsg(nc, &proto.URLFoundMsg{URL: url}); err != nil { log.Err(err).Msg("Unable to publish URL") return c.NoContent(http.StatusInternalServerError) } diff --git a/internal/crawler/crawler.go b/internal/crawler/crawler.go index e917645..9544e5f 100644 --- a/internal/crawler/crawler.go +++ b/internal/crawler/crawler.go @@ -10,7 +10,6 @@ import ( "github.com/urfave/cli/v2" "github.com/valyala/fasthttp" "github.com/valyala/fasthttp/fasthttpproxy" - "mvdan.cc/xurls/v2" "strings" "time" ) @@ -79,7 +78,8 @@ func execute(ctx *cli.Context) error { log.Info().Msg("Successfully initialized tdsh-crawler. Waiting for URLs") - if err := sub.QueueSubscribe(proto.URLTodoSubject, "crawlers", handleMessage(httpClient, ctx.StringSlice("allowed-ct"))); err != nil { + if err := sub.QueueSubscribe(proto.URLTodoSubject, "crawlers", + handleMessage(httpClient, ctx.StringSlice("allowed-ct"))); err != nil { return err } @@ -89,7 +89,7 @@ func execute(ctx *cli.Context) error { func handleMessage(httpClient *fasthttp.Client, allowedContentTypes []string) natsutil.MsgHandler { return func(nc *nats.Conn, msg *nats.Msg) error { var urlMsg proto.URLTodoMsg - if err := natsutil.ReadJSON(msg, &urlMsg); err != nil { + if err := natsutil.ReadMsg(msg, &urlMsg); err != nil { return err } @@ -126,27 +126,14 @@ func handleMessage(httpClient *fasthttp.Client, allowedContentTypes []string) na body := string(resp.Body()) // Publish resource body - res := proto.ResourceMsg{ + res := proto.NewResourceMsg{ URL: urlMsg.URL, Body: body, } - if err := natsutil.PublishJSON(nc, proto.ResourceSubject, &res); err != nil { + if err := natsutil.PublishMsg(nc, &res); err != nil { log.Err(err).Msg("Error while publishing resource body") } - // Extract URLs - xu := xurls.Strict() - urls := xu.FindAllString(body, -1) - - // Publish found URLs - for _, url := range urls { - log.Trace().Str("url", url).Msg("Found URL") - - if err := natsutil.PublishJSON(nc, proto.URLFoundSubject, &proto.URLFoundMsg{URL: url}); err != nil { - log.Err(err).Msg("Error while publishing URL") - } - } - return nil } } diff --git a/internal/persister/persister.go b/internal/extractor/extractor.go similarity index 58% rename from internal/persister/persister.go rename to internal/extractor/extractor.go index 4efc053..958f073 100644 --- a/internal/persister/persister.go +++ b/internal/extractor/extractor.go @@ -1,7 +1,6 @@ -package persister +package extractor import ( - "fmt" "github.com/creekorful/trandoshan/internal/util/http" "github.com/creekorful/trandoshan/internal/util/logging" natsutil "github.com/creekorful/trandoshan/internal/util/nats" @@ -11,12 +10,12 @@ import ( "github.com/urfave/cli/v2" ) -// GetApp return the persister app +// GetApp return the extractor app func GetApp() *cli.App { return &cli.App{ - Name: "tdsh-persister", + Name: "tdsh-extractor", Version: "0.3.0", - Usage: "Trandoshan persister process", + Usage: "Trandoshan extractor process", Flags: []cli.Flag{ logging.GetLogFlag(), &cli.StringFlag{ @@ -37,7 +36,7 @@ func GetApp() *cli.App { func execute(ctx *cli.Context) error { logging.ConfigureLogger(ctx) - log.Info().Str("ver", ctx.App.Version).Msg("Starting tdsh-persister") + log.Info().Str("ver", ctx.App.Version).Msg("Starting tdsh-extractor") log.Debug().Str("uri", ctx.String("nats-uri")).Msg("Using NATS server") log.Debug().Str("uri", ctx.String("api-uri")).Msg("Using API server") @@ -52,9 +51,9 @@ func execute(ctx *cli.Context) error { } defer sub.Close() - log.Info().Msg("Successfully initialized tdsh-persister. Waiting for resources") + log.Info().Msg("Successfully initialized tdsh-extractor. Waiting for resources") - if err := sub.QueueSubscribe(proto.ResourceSubject, "persisters", handleMessage(httpClient, ctx.String("api-uri"))); err != nil { + if err := sub.QueueSubscribe(proto.NewResourceSubject, "extractors", handleMessage(httpClient, ctx.String("api-uri"))); err != nil { return err } @@ -63,26 +62,14 @@ func execute(ctx *cli.Context) error { func handleMessage(httpClient *http.Client, apiURI string) natsutil.MsgHandler { return func(nc *nats.Conn, msg *nats.Msg) error { - var resMsg proto.ResourceMsg - if err := natsutil.ReadJSON(msg, &resMsg); err != nil { + var resMsg proto.NewResourceMsg + if err := natsutil.ReadMsg(msg, &resMsg); err != nil { return err } - log.Debug().Str("url", resMsg.URL).Msg("Processing resource") - - url := fmt.Sprintf("%s/v1/resources", apiURI) - r, err := httpClient.JSONPost(url, &proto.ResourceDto{ - URL: resMsg.URL, - Body: resMsg.Body, - }, nil) - - if err != nil || r.StatusCode != http.StatusCreated { - log.Err(err).Msg("Error while sending resource to the API") - return err - } - - log.Debug().Str("url", resMsg.URL).Msg("Successfully processed resource") + log.Debug().Str("url", resMsg.URL).Msg("Processing new resource") + // TODO return nil } } diff --git a/internal/extractor/extractor_test.go b/internal/extractor/extractor_test.go new file mode 100644 index 0000000..6510df7 --- /dev/null +++ b/internal/extractor/extractor_test.go @@ -0,0 +1 @@ +package extractor diff --git a/internal/scheduler/scheduler.go b/internal/scheduler/scheduler.go index 7fbc384..bcc879c 100644 --- a/internal/scheduler/scheduler.go +++ b/internal/scheduler/scheduler.go @@ -98,7 +98,7 @@ func handleMessage(httpClient *http.Client, apiURI string) natsutil.MsgHandler { // No matches: schedule! if len(urls) == 0 { log.Debug().Stringer("url", normalizedURL).Msg("URL should be scheduled") - if err := natsutil.PublishJSON(nc, proto.URLTodoSubject, &proto.URLTodoMsg{URL: urlMsg.URL}); err != nil { + if err := natsutil.PublishMsg(nc, &proto.URLTodoMsg{URL: urlMsg.URL}); err != nil { return fmt.Errorf("error while publishing URL: %s", err) } } else { diff --git a/internal/util/nats/message.go b/internal/util/nats/message.go new file mode 100644 index 0000000..4697e0d --- /dev/null +++ b/internal/util/nats/message.go @@ -0,0 +1,5 @@ +package nats + +type Msg interface { + Subject() string +} diff --git a/internal/util/nats/nats.go b/internal/util/nats/nats.go index c5a72a9..e860b4d 100644 --- a/internal/util/nats/nats.go +++ b/internal/util/nats/nats.go @@ -6,6 +6,14 @@ import ( "github.com/nats-io/nats.go" ) +func PublishMsg(nc *nats.Conn, msg Msg) error { + return PublishJSON(nc, msg.Subject(), msg) +} + +func ReadMsg(nc *nats.Msg, msg Msg) error { + return ReadJSON(nc, msg) +} + // PublishJSON publish given message serialized in json with given subject func PublishJSON(nc *nats.Conn, subject string, msg interface{}) error { msgBytes, err := json.Marshal(msg) diff --git a/pkg/proto/proto.go b/pkg/proto/proto.go index 9f6cccc..b8639a9 100644 --- a/pkg/proto/proto.go +++ b/pkg/proto/proto.go @@ -3,12 +3,12 @@ package proto import "time" const ( - // URLTodoSubject represent the subject used by the crawler process to read the URL to crawl + // URLTodoSubject is the subject used when an URL is schedule for crawling URLTodoSubject = "url.todo" - // URLFoundSubject represent the subject used by the scheduler process to read the URL to schedule + // URLFoundSubject is the subject used when an URL is extracted from resource URLFoundSubject = "url.found" - // ResourceSubject represent the subject used by the persister process to store the resource body - ResourceSubject = "resource" + // NewResourceSubject is the subject used when a new resource has been crawled + NewResourceSubject = "resource.new" ) // URLTodoMsg represent an URL to crawl @@ -16,17 +16,28 @@ type URLTodoMsg struct { URL string `json:"url"` } +func (msg *URLTodoMsg) Subject() string { + return URLTodoSubject +} + // URLFoundMsg represent a found URL type URLFoundMsg struct { URL string `json:"url"` } -// ResourceMsg represent the body of a crawled resource -type ResourceMsg struct { +func (msg *URLFoundMsg) Subject() string { + return URLFoundSubject +} + +type NewResourceMsg struct { URL string `json:"url"` Body string `json:"body"` } +func (msg *NewResourceMsg) Subject() string { + return NewResourceSubject +} + // ResourceDto represent a resource as given by the API type ResourceDto struct { URL string `json:"url"` diff --git a/scripts/build.sh b/scripts/build.sh index 664b9be..b235964 100755 --- a/scripts/build.sh +++ b/scripts/build.sh @@ -1,7 +1,13 @@ #!/bin/bash +# set image tag if provided +tag="latest" +if [ "$1" ]; then + tag="$1" +fi + # build docker images for path in build/docker/Dockerfile-*; do name=$(echo "$path" | cut -d'-' -f2) - docker build . -f "$path" -t "creekorful/tdsh-$name:latest" + docker build . -f "$path" -t "creekorful/tdsh-$name:$tag" done