mirror of https://github.com/lightninglabs/loop
loopdb: create base sqlite store
parent
206e463db7
commit
f8c65bbc4c
@ -0,0 +1,147 @@
|
||||
package loopdb
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"io"
|
||||
"io/fs"
|
||||
"net/http"
|
||||
"strings"
|
||||
|
||||
"github.com/golang-migrate/migrate/v4"
|
||||
"github.com/golang-migrate/migrate/v4/database"
|
||||
"github.com/golang-migrate/migrate/v4/source/httpfs"
|
||||
)
|
||||
|
||||
// applyMigrations executes all database migration files found in the given file
|
||||
// system under the given path, using the passed database driver and database
|
||||
// name.
|
||||
func applyMigrations(fs fs.FS, driver database.Driver, path,
|
||||
dbName string) error {
|
||||
|
||||
// With the migrate instance open, we'll create a new migration source
|
||||
// using the embedded file system stored in sqlSchemas. The library
|
||||
// we're using can't handle a raw file system interface, so we wrap it
|
||||
// in this intermediate layer.
|
||||
migrateFileServer, err := httpfs.New(http.FS(fs), path)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Finally, we'll run the migration with our driver above based on the
|
||||
// open DB, and also the migration source stored in the file system
|
||||
// above.
|
||||
sqlMigrate, err := migrate.NewWithInstance(
|
||||
"migrations", migrateFileServer, dbName, driver,
|
||||
)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
err = sqlMigrate.Up()
|
||||
if err != nil && err != migrate.ErrNoChange {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// replacerFS is an implementation of a fs.FS virtual file system that wraps an
|
||||
// existing file system but does a search-and-replace operation on each file
|
||||
// when it is opened.
|
||||
type replacerFS struct {
|
||||
parentFS fs.FS
|
||||
replaces map[string]string
|
||||
}
|
||||
|
||||
// A compile-time assertion to make sure replacerFS implements the fs.FS
|
||||
// interface.
|
||||
var _ fs.FS = (*replacerFS)(nil)
|
||||
|
||||
// newReplacerFS creates a new replacer file system, wrapping the given parent
|
||||
// virtual file system. Each file within the file system is undergoing a
|
||||
// search-and-replace operation when it is opened, using the given map where the
|
||||
// key denotes the search term and the value the term to replace each occurrence
|
||||
// with.
|
||||
func newReplacerFS(parent fs.FS, replaces map[string]string) *replacerFS {
|
||||
return &replacerFS{
|
||||
parentFS: parent,
|
||||
replaces: replaces,
|
||||
}
|
||||
}
|
||||
|
||||
// Open opens a file in the virtual file system.
|
||||
//
|
||||
// NOTE: This is part of the fs.FS interface.
|
||||
func (t *replacerFS) Open(name string) (fs.File, error) {
|
||||
f, err := t.parentFS.Open(name)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
stat, err := f.Stat()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if stat.IsDir() {
|
||||
return f, err
|
||||
}
|
||||
|
||||
return newReplacerFile(f, t.replaces)
|
||||
}
|
||||
|
||||
type replacerFile struct {
|
||||
parentFile fs.File
|
||||
buf bytes.Buffer
|
||||
}
|
||||
|
||||
// A compile-time assertion to make sure replacerFile implements the fs.File
|
||||
// interface.
|
||||
var _ fs.File = (*replacerFile)(nil)
|
||||
|
||||
func newReplacerFile(parent fs.File, replaces map[string]string) (*replacerFile,
|
||||
error) {
|
||||
|
||||
content, err := io.ReadAll(parent)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
contentStr := string(content)
|
||||
for from, to := range replaces {
|
||||
contentStr = strings.Replace(contentStr, from, to, -1)
|
||||
}
|
||||
|
||||
var buf bytes.Buffer
|
||||
_, err = buf.WriteString(contentStr)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &replacerFile{
|
||||
parentFile: parent,
|
||||
buf: buf,
|
||||
}, nil
|
||||
}
|
||||
|
||||
// Stat returns statistics/info about the file.
|
||||
//
|
||||
// NOTE: This is part of the fs.File interface.
|
||||
func (t *replacerFile) Stat() (fs.FileInfo, error) {
|
||||
return t.parentFile.Stat()
|
||||
}
|
||||
|
||||
// Read reads as many bytes as possible from the file into the given slice.
|
||||
//
|
||||
// NOTE: This is part of the fs.File interface.
|
||||
func (t *replacerFile) Read(bytes []byte) (int, error) {
|
||||
return t.buf.Read(bytes)
|
||||
}
|
||||
|
||||
// Close closes the underlying file.
|
||||
//
|
||||
// NOTE: This is part of the fs.File interface.
|
||||
func (t *replacerFile) Close() error {
|
||||
// We already fully read and then closed the file when creating this
|
||||
// instance, so there's nothing to do for us here.
|
||||
return nil
|
||||
}
|
@ -0,0 +1,8 @@
|
||||
package loopdb
|
||||
|
||||
import (
|
||||
"embed"
|
||||
)
|
||||
|
||||
//go:embed sqlc/migrations/*.up.sql
|
||||
var sqlSchemas embed.FS
|
@ -0,0 +1,71 @@
|
||||
package loopdb
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
|
||||
"github.com/jackc/pgconn"
|
||||
"github.com/jackc/pgerrcode"
|
||||
"modernc.org/sqlite"
|
||||
sqlite3 "modernc.org/sqlite/lib"
|
||||
)
|
||||
|
||||
// MapSQLError attempts to interpret a given error as a database agnostic SQL
|
||||
// error.
|
||||
func MapSQLError(err error) error {
|
||||
// Attempt to interpret the error as a sqlite error.
|
||||
var sqliteErr *sqlite.Error
|
||||
if errors.As(err, &sqliteErr) {
|
||||
return parseSqliteError(sqliteErr)
|
||||
}
|
||||
|
||||
// Attempt to interpret the error as a postgres error.
|
||||
var pqErr *pgconn.PgError
|
||||
if errors.As(err, &pqErr) {
|
||||
return parsePostgresError(pqErr)
|
||||
}
|
||||
|
||||
// Return original error if it could not be classified as a database
|
||||
// specific error.
|
||||
return err
|
||||
}
|
||||
|
||||
// parsePostgresError attempts to parse a sqlite error as a database agnostic
|
||||
// SQL error.
|
||||
func parseSqliteError(sqliteErr *sqlite.Error) error {
|
||||
switch sqliteErr.Code() {
|
||||
// Handle unique constraint violation error.
|
||||
case sqlite3.SQLITE_CONSTRAINT_UNIQUE:
|
||||
return &ErrSqlUniqueConstraintViolation{
|
||||
DbError: sqliteErr,
|
||||
}
|
||||
|
||||
default:
|
||||
return fmt.Errorf("unknown sqlite error: %w", sqliteErr)
|
||||
}
|
||||
}
|
||||
|
||||
// parsePostgresError attempts to parse a postgres error as a database agnostic
|
||||
// SQL error.
|
||||
func parsePostgresError(pqErr *pgconn.PgError) error {
|
||||
switch pqErr.Code {
|
||||
// Handle unique constraint violation error.
|
||||
case pgerrcode.UniqueViolation:
|
||||
return &ErrSqlUniqueConstraintViolation{
|
||||
DbError: pqErr,
|
||||
}
|
||||
|
||||
default:
|
||||
return fmt.Errorf("unknown postgres error: %w", pqErr)
|
||||
}
|
||||
}
|
||||
|
||||
// ErrSqlUniqueConstraintViolation is an error type which represents a database
|
||||
// agnostic SQL unique constraint violation.
|
||||
type ErrSqlUniqueConstraintViolation struct {
|
||||
DbError error
|
||||
}
|
||||
|
||||
func (e ErrSqlUniqueConstraintViolation) Error() string {
|
||||
return fmt.Sprintf("sql unique constraint violation: %v", e.DbError)
|
||||
}
|
@ -0,0 +1,221 @@
|
||||
package loopdb
|
||||
|
||||
import (
|
||||
"context"
|
||||
"database/sql"
|
||||
"fmt"
|
||||
"net/url"
|
||||
"path/filepath"
|
||||
"testing"
|
||||
|
||||
"github.com/btcsuite/btcd/chaincfg"
|
||||
sqlite_migrate "github.com/golang-migrate/migrate/v4/database/sqlite"
|
||||
"github.com/lightninglabs/loop/loopdb/sqlc"
|
||||
|
||||
"github.com/stretchr/testify/require"
|
||||
_ "modernc.org/sqlite" // Register relevant drivers.
|
||||
)
|
||||
|
||||
const (
|
||||
// sqliteOptionPrefix is the string prefix sqlite uses to set various
|
||||
// options. This is used in the following format:
|
||||
// * sqliteOptionPrefix || option_name = option_value.
|
||||
sqliteOptionPrefix = "_pragma"
|
||||
)
|
||||
|
||||
// SqliteConfig holds all the config arguments needed to interact with our
|
||||
// sqlite DB.
|
||||
type SqliteConfig struct {
|
||||
// SkipMigrations if true, then all the tables will be created on start
|
||||
// up if they don't already exist.
|
||||
SkipMigrations bool `long:"skipmigrations" description:"Skip applying migrations on startup."`
|
||||
|
||||
// DatabaseFileName is the full file path where the database file can be
|
||||
// found.
|
||||
DatabaseFileName string `long:"dbfile" description:"The full path to the database."`
|
||||
}
|
||||
|
||||
// SqliteSwapStore is a sqlite3 based database for the loop daemon.
|
||||
type SqliteSwapStore struct {
|
||||
cfg *SqliteConfig
|
||||
|
||||
*BaseDB
|
||||
}
|
||||
|
||||
// NewSqliteStore attempts to open a new sqlite database based on the passed
|
||||
// config.
|
||||
func NewSqliteStore(cfg *SqliteConfig, network *chaincfg.Params) (*SqliteSwapStore, error) {
|
||||
// The set of pragma options are accepted using query options. For now
|
||||
// we only want to ensure that foreign key constraints are properly
|
||||
// enforced.
|
||||
pragmaOptions := []struct {
|
||||
name string
|
||||
value string
|
||||
}{
|
||||
{
|
||||
name: "foreign_keys",
|
||||
value: "on",
|
||||
},
|
||||
{
|
||||
name: "journal_mode",
|
||||
value: "WAL",
|
||||
},
|
||||
{
|
||||
name: "busy_timeout",
|
||||
value: "5000",
|
||||
},
|
||||
}
|
||||
sqliteOptions := make(url.Values)
|
||||
for _, option := range pragmaOptions {
|
||||
sqliteOptions.Add(
|
||||
sqliteOptionPrefix,
|
||||
fmt.Sprintf("%v=%v", option.name, option.value),
|
||||
)
|
||||
}
|
||||
|
||||
// Construct the DSN which is just the database file name, appended
|
||||
// with the series of pragma options as a query URL string. For more
|
||||
// details on the formatting here, see the modernc.org/sqlite docs:
|
||||
// https://pkg.go.dev/modernc.org/sqlite#Driver.Open.
|
||||
dsn := fmt.Sprintf(
|
||||
"%v?%v", cfg.DatabaseFileName, sqliteOptions.Encode(),
|
||||
)
|
||||
db, err := sql.Open("sqlite", dsn)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if !cfg.SkipMigrations {
|
||||
// Now that the database is open, populate the database with
|
||||
// our set of schemas based on our embedded in-memory file
|
||||
// system.
|
||||
//
|
||||
// First, we'll need to open up a new migration instance for
|
||||
// our current target database: sqlite.
|
||||
driver, err := sqlite_migrate.WithInstance(
|
||||
db, &sqlite_migrate.Config{},
|
||||
)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
err = applyMigrations(
|
||||
sqlSchemas, driver, "sqlc/migrations", "sqlc",
|
||||
)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
queries := sqlc.New(db)
|
||||
|
||||
return &SqliteSwapStore{
|
||||
cfg: cfg,
|
||||
BaseDB: &BaseDB{
|
||||
DB: db,
|
||||
Queries: queries,
|
||||
network: network,
|
||||
},
|
||||
}, nil
|
||||
}
|
||||
|
||||
// NewTestSqliteDB is a helper function that creates an SQLite database for
|
||||
// testing.
|
||||
func NewTestSqliteDB(t *testing.T) *SqliteSwapStore {
|
||||
t.Helper()
|
||||
|
||||
t.Logf("Creating new SQLite DB for testing")
|
||||
|
||||
dbFileName := filepath.Join(t.TempDir(), "tmp.db")
|
||||
sqlDB, err := NewSqliteStore(&SqliteConfig{
|
||||
DatabaseFileName: dbFileName,
|
||||
SkipMigrations: false,
|
||||
}, &chaincfg.MainNetParams)
|
||||
require.NoError(t, err)
|
||||
|
||||
t.Cleanup(func() {
|
||||
require.NoError(t, sqlDB.DB.Close())
|
||||
})
|
||||
|
||||
return sqlDB
|
||||
}
|
||||
|
||||
// BaseDB is the base database struct that each implementation can embed to
|
||||
// gain some common functionality.
|
||||
type BaseDB struct {
|
||||
network *chaincfg.Params
|
||||
|
||||
*sql.DB
|
||||
|
||||
*sqlc.Queries
|
||||
}
|
||||
|
||||
// BeginTx wraps the normal sql specific BeginTx method with the TxOptions
|
||||
// interface. This interface is then mapped to the concrete sql tx options
|
||||
// struct.
|
||||
func (db *BaseDB) BeginTx(ctx context.Context,
|
||||
opts TxOptions) (*sql.Tx, error) {
|
||||
|
||||
sqlOptions := sql.TxOptions{
|
||||
ReadOnly: opts.ReadOnly(),
|
||||
}
|
||||
return db.DB.BeginTx(ctx, &sqlOptions)
|
||||
}
|
||||
|
||||
// ExecTx is a wrapper for txBody to abstract the creation and commit of a db
|
||||
// transaction. The db transaction is embedded in a `*postgres.Queries` that
|
||||
// txBody needs to use when executing each one of the queries that need to be
|
||||
// applied atomically.
|
||||
func (db *BaseDB) ExecTx(ctx context.Context, txOptions TxOptions,
|
||||
txBody func(*sqlc.Queries) error) error {
|
||||
|
||||
// Create the db transaction.
|
||||
tx, err := db.BeginTx(ctx, txOptions)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Rollback is safe to call even if the tx is already closed, so if
|
||||
// the tx commits successfully, this is a no-op.
|
||||
defer tx.Rollback() //nolint: errcheck
|
||||
|
||||
if err := txBody(db.Queries.WithTx(tx)); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Commit transaction.
|
||||
if err = tx.Commit(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// TxOptions represents a set of options one can use to control what type of
|
||||
// database transaction is created. Transaction can wither be read or write.
|
||||
type TxOptions interface {
|
||||
// ReadOnly returns true if the transaction should be read only.
|
||||
ReadOnly() bool
|
||||
}
|
||||
|
||||
// SqliteTxOptions defines the set of db txn options the KeyStore
|
||||
// understands.
|
||||
type SqliteTxOptions struct {
|
||||
// readOnly governs if a read only transaction is needed or not.
|
||||
readOnly bool
|
||||
}
|
||||
|
||||
// NewKeyStoreReadOpts returns a new KeyStoreTxOptions instance triggers a read
|
||||
// transaction.
|
||||
func NewSqlReadOpts() *SqliteTxOptions {
|
||||
return &SqliteTxOptions{
|
||||
readOnly: true,
|
||||
}
|
||||
}
|
||||
|
||||
// ReadOnly returns true if the transaction should be read only.
|
||||
//
|
||||
// NOTE: This implements the TxOptions
|
||||
func (r *SqliteTxOptions) ReadOnly() bool {
|
||||
return r.readOnly
|
||||
}
|
Loading…
Reference in New Issue