Refactored test dummy into implementation

pull/17/head
マリウス 2 years ago
parent 4d5c9b78ad
commit 0b874d1a34
No known key found for this signature in database
GPG Key ID: 272ED814BF63261F

@ -0,0 +1,228 @@
package database
import (
"context"
orbitdb "berty.tech/go-orbit-db"
"berty.tech/go-orbit-db/accesscontroller"
"berty.tech/go-orbit-db/iface"
"berty.tech/go-orbit-db/stores/documentstore"
config "github.com/ipfs/go-ipfs-config"
icore "github.com/ipfs/interface-go-ipfs-core"
"github.com/mitchellh/mapstructure"
"go.uber.org/zap"
"github.com/mrusme/superhighway84/models"
)
type Database struct {
ctx context.Context
URI string
Cache string
Logger *zap.Logger
IPFSNode icore.CoreAPI
OrbitDB orbitdb.OrbitDB
Store orbitdb.DocumentStore
}
func (db *Database)init() (error) {
var err error
db.OrbitDB, err = orbitdb.NewOrbitDB(db.ctx, db.IPFSNode, &orbitdb.NewOrbitDBOptions{
Directory: &db.Cache,
Logger: db.Logger,
})
if err != nil {
return err
}
ac := &accesscontroller.CreateAccessControllerOptions{
Access: map[string][]string{
"write": {
"*",
},
},
}
if err != nil {
return err
}
addr, err := db.OrbitDB.DetermineAddress(db.ctx, "sync-test", "docstore", &orbitdb.DetermineAddressOptions{})
if err != nil {
return err
}
db.URI = addr.String()
db.Store, err = db.OrbitDB.Docs(db.ctx, "sync-test", &orbitdb.CreateDBOptions{
AccessController: ac,
StoreSpecificOpts: documentstore.DefaultStoreOptsForMap("id"),
})
if err != nil {
return err
}
return nil
}
func (db *Database)open() (error) {
var err error
db.OrbitDB, err = orbitdb.NewOrbitDB(db.ctx, db.IPFSNode, &orbitdb.NewOrbitDBOptions{
Directory: &db.Cache,
})
if err != nil {
return err
}
create := false
storetype := "docstore"
dbstore, err := db.OrbitDB.Open(db.ctx, db.URI, &orbitdb.CreateDBOptions{
Create: &create,
StoreType: &storetype,
StoreSpecificOpts: documentstore.DefaultStoreOptsForMap("id"),
})
if err != nil {
return err
}
db.Store = dbstore.(orbitdb.DocumentStore)
return nil
}
func NewDatabase(
ctx context.Context,
dbURI string,
dbCache string,
dbInit bool,
logger *zap.Logger,
) (*Database, error) {
var err error
db := new(Database)
db.ctx = ctx
db.URI = dbURI
db.Cache = dbCache
db.Logger = logger
defaultPath, err := config.PathRoot()
if err != nil {
return nil, err
}
if err := setupPlugins(defaultPath); err != nil {
return nil, err
}
db.IPFSNode, err = createNode(ctx, defaultPath)
if err != nil {
return nil, err
}
if dbInit {
err = db.init()
if err != nil {
return nil, err
}
} else {
err = db.open()
if err != nil {
return nil, err
}
}
// someDirectory, err := getUnixfsNode(dbCache)
// if err != nil {
// panic(fmt.Errorf("Could not get File: %s", err))
// }
// cidDirectory, err := ipfs.Unixfs().Add(ctx, someDirectory)
// if err != nil {
// panic(fmt.Errorf("Could not add Directory: %s", err))
// }
//
// fmt.Printf("Added directory to IPFS with CID %s\n", cidDirectory.String())
err = db.Store.Load(ctx, -1)
if err != nil {
// TODO: clean up
return nil, err
}
// log.Println(db.Store.ReplicationStatus().GetBuffered())
// log.Println(db.Store.ReplicationStatus().GetQueued())
// log.Println(db.Store.ReplicationStatus().GetProgress())
db.Logger.Info("running ...")
return db, nil
}
func (db *Database) Connect() {
go func() {
err := connectToPeers(db.ctx, db.IPFSNode)
if err != nil {
db.Logger.Debug("failed to connect: %s", zap.Error(err))
} else {
db.Logger.Debug("connected to peer!")
}
}()
}
func (db *Database) Disconnect() {
db.OrbitDB.Close()
}
func (db *Database) SubmitArticle(article *models.Article) (error) {
entity := structToMap(&article)
entity["type"] = "article"
_, err := db.Store.Put(db.ctx, entity)
return err
}
func (db *Database) GetArticleByID(id string) (models.Article, error) {
entity, err := db.Store.Get(db.ctx, id, &iface.DocumentStoreGetOptions{CaseInsensitive: false})
if err != nil {
return models.Article{}, err
}
var article models.Article
err = mapstructure.Decode(entity[0], &article)
if err != nil {
return models.Article{}, err
}
return article, nil
}
func (db *Database) ListArticles() ([]models.Article, error) {
var articles []models.Article
entities, err := db.Store.Query(db.ctx, func(e interface{})(bool, error) {
entity := e.(map[string]interface{})
if entity["type"] == "article" {
return true, nil
}
return false, nil
})
if err != nil {
return articles, err
}
for _, entity := range entities {
var article models.Article
err = mapstructure.Decode(entity, &article)
if err != nil {
return articles, err
}
articles = append(articles, article)
}
return articles, nil
}

@ -0,0 +1,109 @@
package database
import (
"context"
"encoding/json"
"fmt"
"log"
"os"
"path/filepath"
"sync"
config "github.com/ipfs/go-ipfs-config"
files "github.com/ipfs/go-ipfs-files"
"github.com/ipfs/go-ipfs/core"
"github.com/ipfs/go-ipfs/core/coreapi"
"github.com/ipfs/go-ipfs/core/node/libp2p"
"github.com/ipfs/go-ipfs/plugin/loader"
"github.com/ipfs/go-ipfs/repo/fsrepo"
icore "github.com/ipfs/interface-go-ipfs-core"
"github.com/libp2p/go-libp2p-core/peer"
)
func setupPlugins(path string) error {
// Load plugins. This will skip the repo if not available.
plugins, err := loader.NewPluginLoader(filepath.Join(path, "plugins"))
if err != nil {
return fmt.Errorf("error loading plugins: %s", err)
}
if err := plugins.Initialize(); err != nil {
return fmt.Errorf("error initializing plugins: %s", err)
}
if err := plugins.Inject(); err != nil {
return fmt.Errorf("error initializing plugins: %s", err)
}
return nil
}
func createNode(ctx context.Context, repoPath string) (icore.CoreAPI, error) {
repo, err := fsrepo.Open(repoPath)
if err != nil {
return nil, err
}
nodeOptions := &core.BuildCfg{
Online: true,
Routing: libp2p.DHTOption,
Repo: repo,
ExtraOpts: map[string]bool{
"pubsub": true,
},
}
node, err := core.NewNode(ctx, nodeOptions)
if err != nil {
return nil, err
}
// Attach the Core API to the constructed node
return coreapi.NewCoreAPI(node)
}
func connectToPeers(ctx context.Context, ipfs icore.CoreAPI) error {
var wg sync.WaitGroup
peerInfos, err := config.DefaultBootstrapPeers()
if err != nil {
return err
}
wg.Add(len(peerInfos))
for _, peerInfo := range peerInfos {
go func(peerInfo *peer.AddrInfo) {
defer wg.Done()
err := ipfs.Swarm().Connect(ctx, *peerInfo)
if err != nil {
log.Printf("failed to connect to %s: %s", peerInfo.ID, err)
} else {
log.Printf("connected to %s!", peerInfo.ID)
}
}(&peerInfo)
}
wg.Wait()
return nil
}
func getUnixfsNode(path string) (files.Node, error) {
st, err := os.Stat(path)
if err != nil {
return nil, err
}
f, err := files.NewSerialFile(path, false, st)
if err != nil {
return nil, err
}
return f, nil
}
func structToMap(v interface{}) (map[string]interface{}) {
var vMap map[string]interface{}
data, _ := json.Marshal(v)
json.Unmarshal(data, &vMap)
return vMap
}

@ -157,6 +157,7 @@ require (
github.com/minio/blake2b-simd v0.0.0-20160723061019-3f5f724cb5b1 // indirect
github.com/minio/sha256-simd v1.0.0 // indirect
github.com/mitchellh/go-homedir v1.1.0 // indirect
github.com/mitchellh/mapstructure v1.4.3 // indirect
github.com/mr-tron/base58 v1.2.0 // indirect
github.com/multiformats/go-base32 v0.0.3 // indirect
github.com/multiformats/go-base36 v0.1.0 // indirect

@ -924,6 +924,8 @@ github.com/mitchellh/gox v0.4.0/go.mod h1:Sd9lOJ0+aimLBi73mGofS1ycjY8lL3uZM3JPS4
github.com/mitchellh/iochan v1.0.0/go.mod h1:JwYml1nuB7xOzsp52dPpHFffvOCDupsG0QubkSMEySY=
github.com/mitchellh/mapstructure v0.0.0-20160808181253-ca63d7c062ee/go.mod h1:FVVH3fgwuzCH5S8UJGiWEs2h04kUh9fWfEaFds41c1Y=
github.com/mitchellh/mapstructure v1.1.2/go.mod h1:FVVH3fgwuzCH5S8UJGiWEs2h04kUh9fWfEaFds41c1Y=
github.com/mitchellh/mapstructure v1.4.3 h1:OVowDSCllw/YjdLkam3/sm7wEtOy59d8ndGgCcyj8cs=
github.com/mitchellh/mapstructure v1.4.3/go.mod h1:bFUtVrKA4DC2yAKiSyO/QUcy7e+RRV2QTWOzhPopBRo=
github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q=
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q=
github.com/modern-go/reflect2 v0.0.0-20180701023420-4b7aa43c6742/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0=

@ -0,0 +1,30 @@
package models
import (
"time"
"github.com/google/uuid"
)
type Article struct {
ID string `mmapstructure:"id" json:"id"`
InReplyToID string `mmapstructure:"in-reply-to-id" json:"in-reply-to-id"`
From string `mmapstructure:"from" json:"from"`
Newsgroup string `mmapstructure:"newsgroup" json:"newsgroup"`
Subject string `mmapstructure:"subject" json:"subject"`
Date int64 `mmapstructure:"date" json:"date"`
Organization string `mmapstructure:"organization" json:"organization"`
Body string `mmapstructure:"body" json:"body"`
}
func NewArticle() (*Article) {
article := new(Article)
id, _ := uuid.NewUUID()
article.ID = id.String()
article.Date = time.Now().UnixNano() / int64(time.Millisecond)
return article
}

@ -5,275 +5,44 @@ import (
"fmt"
"log"
"os"
"path/filepath"
"sync"
orbitdb "berty.tech/go-orbit-db"
"berty.tech/go-orbit-db/accesscontroller"
"berty.tech/go-orbit-db/iface"
"github.com/google/uuid"
config "github.com/ipfs/go-ipfs-config"
files "github.com/ipfs/go-ipfs-files"
"github.com/ipfs/go-ipfs/core"
"github.com/ipfs/go-ipfs/core/coreapi"
"github.com/ipfs/go-ipfs/core/node/libp2p"
"github.com/ipfs/go-ipfs/plugin/loader"
"github.com/ipfs/go-ipfs/repo/fsrepo"
icore "github.com/ipfs/interface-go-ipfs-core"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/mrusme/superhighway84/database"
"github.com/mrusme/superhighway84/models"
"go.uber.org/zap"
)
func setupPlugins(path string) error {
// Load plugins. This will skip the repo if not available.
plugins, err := loader.NewPluginLoader(filepath.Join(path, "plugins"))
if err != nil {
return fmt.Errorf("error loading plugins: %s", err)
}
if err := plugins.Initialize(); err != nil {
return fmt.Errorf("error initializing plugins: %s", err)
}
if err := plugins.Inject(); err != nil {
return fmt.Errorf("error initializing plugins: %s", err)
}
return nil
}
func createNode(ctx context.Context, repoPath string) (icore.CoreAPI, error) {
repo, err := fsrepo.Open(repoPath)
if err != nil {
return nil, err
}
nodeOptions := &core.BuildCfg{
Online: true,
Routing: libp2p.DHTOption,
Repo: repo,
ExtraOpts: map[string]bool{
"pubsub": true,
},
}
node, err := core.NewNode(ctx, nodeOptions)
if err != nil {
return nil, err
}
// Attach the Core API to the constructed node
return coreapi.NewCoreAPI(node)
}
func connectToPeers(ctx context.Context, ipfs icore.CoreAPI, peers []string) error {
var wg sync.WaitGroup
// peerInfos := make(map[peer.ID]*peer.AddrInfo, len(peers))
// for _, addrStr := range peers {
// addr, err := ma.NewMultiaddr(addrStr)
// if err != nil {
// return err
// }
// pii, err := peer.AddrInfoFromP2pAddr(addr)
// if err != nil {
// return err
// }
// pi, ok := peerInfos[pii.ID]
// if !ok {
// pi = &peer.AddrInfo{ID: pii.ID}
// peerInfos[pi.ID] = pi
// }
// pi.Addrs = append(pi.Addrs, pii.Addrs...)
// }
peerInfos, err := config.DefaultBootstrapPeers()
if err != nil {
return err
}
wg.Add(len(peerInfos))
for _, peerInfo := range peerInfos {
go func(peerInfo *peer.AddrInfo) {
defer wg.Done()
err := ipfs.Swarm().Connect(ctx, *peerInfo)
if err != nil {
log.Printf("failed to connect to %s: %s", peerInfo.ID, err)
} else {
log.Printf("connected to %s!", peerInfo.ID)
}
}(&peerInfo)
}
wg.Wait()
return nil
}
func getUnixfsNode(path string) (files.Node, error) {
st, err := os.Stat(path)
if err != nil {
return nil, err
}
f, err := files.NewSerialFile(path, false, st)
if err != nil {
return nil, err
}
return f, nil
}
func main() {
var err error
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
log.Println("press w for writer or r for reader")
var testtype string
var testid string
fmt.Scanln(&testtype)
if testtype == "r" {
log.Println("enter the id")
fmt.Scanln(&testid)
dbInit := false
dbInitValue := os.Getenv("SUPERHIGHWAY84_DB_INIT")
if dbInitValue == "1" {
dbInit = true
}
defaultPath, err := config.PathRoot()
if err != nil {
log.Println(err)
return
dbURI := os.Getenv("SUPERHIGHWAY84_DB_URI")
if dbInit == false && dbURI == "" {
log.Panicln("SUPERHIGHWAY84_DB_URI missing!")
}
log.Println(defaultPath)
if err := setupPlugins(defaultPath); err != nil {
log.Println(err)
return
}
ipfs, err := createNode(ctx, defaultPath)
if err != nil {
log.Println(err)
return
dbCache := os.Getenv("SUPERHIGHWAY84_DB_CACHE")
if dbCache == "" {
log.Panicln("SUPERHIGHWAY84_DB_CACHE missing!")
}
bootstrapNodes := []string{
// // IPFS Bootstrapper nodes.
// "/dnsaddr/bootstrap.libp2p.io/p2p/QmNnooDu7bfjPFoTZYxMNLWUQJyrVwtbZg5gBMjTezGAJN",
// "/dnsaddr/bootstrap.libp2p.io/p2p/QmQCU2EcMqAqQPR2i9bChDtGNJchTbq5TbXJJ16u19uLTa",
// "/dnsaddr/bootstrap.libp2p.io/p2p/QmbLHAnMoJPWSCR5Zhtx6BHJX9KiKNN6tpvbUcqanj75Nb",
// "/dnsaddr/bootstrap.libp2p.io/p2p/QmcZf59bWwK5XFi76CZX8cbJ4BhTzzA3gU1ZjYZcYW3dwt",
//
// // IPFS Cluster Pinning nodes
// "/ip4/138.201.67.219/tcp/4001/p2p/QmUd6zHcbkbcs7SMxwLs48qZVX3vpcM8errYS7xEczwRMA",
// "/ip4/138.201.67.219/udp/4001/quic/p2p/QmUd6zHcbkbcs7SMxwLs48qZVX3vpcM8errYS7xEczwRMA",
// "/ip4/138.201.67.220/tcp/4001/p2p/QmNSYxZAiJHeLdkBg38roksAR9So7Y5eojks1yjEcUtZ7i",
// "/ip4/138.201.67.220/udp/4001/quic/p2p/QmNSYxZAiJHeLdkBg38roksAR9So7Y5eojks1yjEcUtZ7i",
// "/ip4/138.201.68.74/tcp/4001/p2p/QmdnXwLrC8p1ueiq2Qya8joNvk3TVVDAut7PrikmZwubtR",
// "/ip4/138.201.68.74/udp/4001/quic/p2p/QmdnXwLrC8p1ueiq2Qya8joNvk3TVVDAut7PrikmZwubtR",
// "/ip4/94.130.135.167/tcp/4001/p2p/QmUEMvxS2e7iDrereVYc5SWPauXPyNwxcy9BXZrC1QTcHE",
// "/ip4/94.130.135.167/udp/4001/quic/p2p/QmUEMvxS2e7iDrereVYc5SWPauXPyNwxcy9BXZrC1QTcHE",
//
// // You can add more nodes here, for example, another IPFS node you might have running locally, mine was:
// // "/ip4/127.0.0.1/tcp/4010/p2p/QmZp2fhDLxjYue2RiUvLwT9MWdnbDxam32qYFnGmxZDh5L",
// // "/ip4/127.0.0.1/udp/4010/quic/p2p/QmZp2fhDLxjYue2RiUvLwT9MWdnbDxam32qYFnGmxZDh5L",
}
go func() {
err := connectToPeers(ctx, ipfs, bootstrapNodes)
if err != nil {
log.Printf("failed connect to peers: %s", err)
} else {
log.Println("connected to node!")
}
}()
logger, err := zap.NewDevelopment()
orbitDir := os.Getenv("SUPERHIGHWAY84_DB")
var orbitdb1 orbitdb.OrbitDB
var db1 orbitdb.DocumentStore
if testtype == "w" {
log.Println("Opening OrbitDB as writer ...")
orbitdb1, err = orbitdb.NewOrbitDB(ctx, ipfs, &orbitdb.NewOrbitDBOptions{
Directory: &orbitDir,
Logger: logger,
})
ac := &accesscontroller.CreateAccessControllerOptions{
Access: map[string][]string{
"write": {
"*",
},
},
}
if err != nil {
log.Println(err)
return
}
defer orbitdb1.Close()
log.Println(orbitdb1.Identity().ID)
addr, err := orbitdb1.DetermineAddress(ctx, "sync-test", "docstore", &orbitdb.DetermineAddressOptions{})
if err != nil {
log.Println(err)
return
}
log.Println(addr.String())
db1, err = orbitdb1.Docs(ctx, "sync-test", &orbitdb.CreateDBOptions{
AccessController: ac,
})
if err != nil {
log.Println(err)
return
}
} else {
log.Println("Opening OrbitDB as reader ...")
orbitdb1, err = orbitdb.NewOrbitDB(ctx, ipfs, &orbitdb.NewOrbitDBOptions{
Directory: &orbitDir,
})
if err != nil {
log.Println(err)
return
}
log.Println("NewOrbitDB succeeded")
create := false
storetype := "docstore"
dbstore, err := orbitdb1.Open(ctx, testid, &orbitdb.CreateDBOptions{Create: &create, StoreType: &storetype})
if err != nil {
log.Println(err)
return
}
log.Println("Test")
db1 = dbstore.(orbitdb.DocumentStore)
if err != nil {
log.Panicln(err)
}
log.Println("opened!")
// someDirectory, err := getUnixfsNode(orbitDir)
// if err != nil {
// panic(fmt.Errorf("Could not get File: %s", err))
// }
// cidDirectory, err := ipfs.Unixfs().Add(ctx, someDirectory)
// if err != nil {
// panic(fmt.Errorf("Could not add Directory: %s", err))
// }
//
// fmt.Printf("Added directory to IPFS with CID %s\n", cidDirectory.String())
if testtype == "w" {
} else {
err = db1.Load(ctx, -1)
if err != nil {
log.Println(err)
return
}
db, err := database.NewDatabase(ctx, dbURI, dbCache, dbInit, logger)
if err != nil {
log.Panicln(err)
}
log.Println(db1.ReplicationStatus().GetBuffered())
log.Println(db1.ReplicationStatus().GetQueued())
log.Println(db1.ReplicationStatus().GetProgress())
log.Println("Running ...")
defer db.Disconnect()
db.Connect()
var input string
for {
@ -284,31 +53,34 @@ func main() {
return
case "g":
fmt.Scanln(&input)
docs, err := db1.Get(ctx, input, &iface.DocumentStoreGetOptions{CaseInsensitive: false})
article, err := db.GetArticleByID(input)
if err != nil {
log.Println(err)
} else {
log.Println(docs)
log.Println(article)
}
case "p":
id, _ := uuid.NewUUID()
_, err = db1.Put(ctx, map[string]interface{}{"_id": id.String(), "hello": "world"})
article := models.NewArticle()
article.From = "test@example.com"
article.Newsgroup = "comp.test"
article.Subject = "This is a test!"
article.Body = "Hey there, this is a test!"
err = db.SubmitArticle(article)
if err != nil {
log.Println(err)
} else {
log.Println(id)
log.Println(article)
}
case "l":
docs, err := db1.Query(ctx, func(e interface{})(bool, error) {
return true, nil
})
articles, err := db.ListArticles()
if err != nil {
log.Println(err)
} else {
log.Println(docs)
log.Println(articles)
}
}
}
}

Loading…
Cancel
Save