From f8c65bbc4ce92d11f485f0d3cdc76bb4b20ff08d Mon Sep 17 00:00:00 2001 From: sputn1ck Date: Wed, 17 May 2023 19:07:58 +0200 Subject: [PATCH] loopdb: create base sqlite store --- loopdb/migrations.go | 147 ++++++++++++++++++++++++++++ loopdb/schemas.go | 8 ++ loopdb/sqlerrors.go | 71 ++++++++++++++ loopdb/sqlite.go | 221 +++++++++++++++++++++++++++++++++++++++++++ 4 files changed, 447 insertions(+) create mode 100644 loopdb/migrations.go create mode 100644 loopdb/schemas.go create mode 100644 loopdb/sqlerrors.go create mode 100644 loopdb/sqlite.go diff --git a/loopdb/migrations.go b/loopdb/migrations.go new file mode 100644 index 0000000..bdf40b2 --- /dev/null +++ b/loopdb/migrations.go @@ -0,0 +1,147 @@ +package loopdb + +import ( + "bytes" + "io" + "io/fs" + "net/http" + "strings" + + "github.com/golang-migrate/migrate/v4" + "github.com/golang-migrate/migrate/v4/database" + "github.com/golang-migrate/migrate/v4/source/httpfs" +) + +// applyMigrations executes all database migration files found in the given file +// system under the given path, using the passed database driver and database +// name. +func applyMigrations(fs fs.FS, driver database.Driver, path, + dbName string) error { + + // With the migrate instance open, we'll create a new migration source + // using the embedded file system stored in sqlSchemas. The library + // we're using can't handle a raw file system interface, so we wrap it + // in this intermediate layer. + migrateFileServer, err := httpfs.New(http.FS(fs), path) + if err != nil { + return err + } + + // Finally, we'll run the migration with our driver above based on the + // open DB, and also the migration source stored in the file system + // above. + sqlMigrate, err := migrate.NewWithInstance( + "migrations", migrateFileServer, dbName, driver, + ) + if err != nil { + return err + } + err = sqlMigrate.Up() + if err != nil && err != migrate.ErrNoChange { + return err + } + + return nil +} + +// replacerFS is an implementation of a fs.FS virtual file system that wraps an +// existing file system but does a search-and-replace operation on each file +// when it is opened. +type replacerFS struct { + parentFS fs.FS + replaces map[string]string +} + +// A compile-time assertion to make sure replacerFS implements the fs.FS +// interface. +var _ fs.FS = (*replacerFS)(nil) + +// newReplacerFS creates a new replacer file system, wrapping the given parent +// virtual file system. Each file within the file system is undergoing a +// search-and-replace operation when it is opened, using the given map where the +// key denotes the search term and the value the term to replace each occurrence +// with. +func newReplacerFS(parent fs.FS, replaces map[string]string) *replacerFS { + return &replacerFS{ + parentFS: parent, + replaces: replaces, + } +} + +// Open opens a file in the virtual file system. +// +// NOTE: This is part of the fs.FS interface. +func (t *replacerFS) Open(name string) (fs.File, error) { + f, err := t.parentFS.Open(name) + if err != nil { + return nil, err + } + + stat, err := f.Stat() + if err != nil { + return nil, err + } + + if stat.IsDir() { + return f, err + } + + return newReplacerFile(f, t.replaces) +} + +type replacerFile struct { + parentFile fs.File + buf bytes.Buffer +} + +// A compile-time assertion to make sure replacerFile implements the fs.File +// interface. +var _ fs.File = (*replacerFile)(nil) + +func newReplacerFile(parent fs.File, replaces map[string]string) (*replacerFile, + error) { + + content, err := io.ReadAll(parent) + if err != nil { + return nil, err + } + + contentStr := string(content) + for from, to := range replaces { + contentStr = strings.Replace(contentStr, from, to, -1) + } + + var buf bytes.Buffer + _, err = buf.WriteString(contentStr) + if err != nil { + return nil, err + } + + return &replacerFile{ + parentFile: parent, + buf: buf, + }, nil +} + +// Stat returns statistics/info about the file. +// +// NOTE: This is part of the fs.File interface. +func (t *replacerFile) Stat() (fs.FileInfo, error) { + return t.parentFile.Stat() +} + +// Read reads as many bytes as possible from the file into the given slice. +// +// NOTE: This is part of the fs.File interface. +func (t *replacerFile) Read(bytes []byte) (int, error) { + return t.buf.Read(bytes) +} + +// Close closes the underlying file. +// +// NOTE: This is part of the fs.File interface. +func (t *replacerFile) Close() error { + // We already fully read and then closed the file when creating this + // instance, so there's nothing to do for us here. + return nil +} diff --git a/loopdb/schemas.go b/loopdb/schemas.go new file mode 100644 index 0000000..8c9895c --- /dev/null +++ b/loopdb/schemas.go @@ -0,0 +1,8 @@ +package loopdb + +import ( + "embed" +) + +//go:embed sqlc/migrations/*.up.sql +var sqlSchemas embed.FS diff --git a/loopdb/sqlerrors.go b/loopdb/sqlerrors.go new file mode 100644 index 0000000..54562fc --- /dev/null +++ b/loopdb/sqlerrors.go @@ -0,0 +1,71 @@ +package loopdb + +import ( + "errors" + "fmt" + + "github.com/jackc/pgconn" + "github.com/jackc/pgerrcode" + "modernc.org/sqlite" + sqlite3 "modernc.org/sqlite/lib" +) + +// MapSQLError attempts to interpret a given error as a database agnostic SQL +// error. +func MapSQLError(err error) error { + // Attempt to interpret the error as a sqlite error. + var sqliteErr *sqlite.Error + if errors.As(err, &sqliteErr) { + return parseSqliteError(sqliteErr) + } + + // Attempt to interpret the error as a postgres error. + var pqErr *pgconn.PgError + if errors.As(err, &pqErr) { + return parsePostgresError(pqErr) + } + + // Return original error if it could not be classified as a database + // specific error. + return err +} + +// parsePostgresError attempts to parse a sqlite error as a database agnostic +// SQL error. +func parseSqliteError(sqliteErr *sqlite.Error) error { + switch sqliteErr.Code() { + // Handle unique constraint violation error. + case sqlite3.SQLITE_CONSTRAINT_UNIQUE: + return &ErrSqlUniqueConstraintViolation{ + DbError: sqliteErr, + } + + default: + return fmt.Errorf("unknown sqlite error: %w", sqliteErr) + } +} + +// parsePostgresError attempts to parse a postgres error as a database agnostic +// SQL error. +func parsePostgresError(pqErr *pgconn.PgError) error { + switch pqErr.Code { + // Handle unique constraint violation error. + case pgerrcode.UniqueViolation: + return &ErrSqlUniqueConstraintViolation{ + DbError: pqErr, + } + + default: + return fmt.Errorf("unknown postgres error: %w", pqErr) + } +} + +// ErrSqlUniqueConstraintViolation is an error type which represents a database +// agnostic SQL unique constraint violation. +type ErrSqlUniqueConstraintViolation struct { + DbError error +} + +func (e ErrSqlUniqueConstraintViolation) Error() string { + return fmt.Sprintf("sql unique constraint violation: %v", e.DbError) +} diff --git a/loopdb/sqlite.go b/loopdb/sqlite.go new file mode 100644 index 0000000..81d269e --- /dev/null +++ b/loopdb/sqlite.go @@ -0,0 +1,221 @@ +package loopdb + +import ( + "context" + "database/sql" + "fmt" + "net/url" + "path/filepath" + "testing" + + "github.com/btcsuite/btcd/chaincfg" + sqlite_migrate "github.com/golang-migrate/migrate/v4/database/sqlite" + "github.com/lightninglabs/loop/loopdb/sqlc" + + "github.com/stretchr/testify/require" + _ "modernc.org/sqlite" // Register relevant drivers. +) + +const ( + // sqliteOptionPrefix is the string prefix sqlite uses to set various + // options. This is used in the following format: + // * sqliteOptionPrefix || option_name = option_value. + sqliteOptionPrefix = "_pragma" +) + +// SqliteConfig holds all the config arguments needed to interact with our +// sqlite DB. +type SqliteConfig struct { + // SkipMigrations if true, then all the tables will be created on start + // up if they don't already exist. + SkipMigrations bool `long:"skipmigrations" description:"Skip applying migrations on startup."` + + // DatabaseFileName is the full file path where the database file can be + // found. + DatabaseFileName string `long:"dbfile" description:"The full path to the database."` +} + +// SqliteSwapStore is a sqlite3 based database for the loop daemon. +type SqliteSwapStore struct { + cfg *SqliteConfig + + *BaseDB +} + +// NewSqliteStore attempts to open a new sqlite database based on the passed +// config. +func NewSqliteStore(cfg *SqliteConfig, network *chaincfg.Params) (*SqliteSwapStore, error) { + // The set of pragma options are accepted using query options. For now + // we only want to ensure that foreign key constraints are properly + // enforced. + pragmaOptions := []struct { + name string + value string + }{ + { + name: "foreign_keys", + value: "on", + }, + { + name: "journal_mode", + value: "WAL", + }, + { + name: "busy_timeout", + value: "5000", + }, + } + sqliteOptions := make(url.Values) + for _, option := range pragmaOptions { + sqliteOptions.Add( + sqliteOptionPrefix, + fmt.Sprintf("%v=%v", option.name, option.value), + ) + } + + // Construct the DSN which is just the database file name, appended + // with the series of pragma options as a query URL string. For more + // details on the formatting here, see the modernc.org/sqlite docs: + // https://pkg.go.dev/modernc.org/sqlite#Driver.Open. + dsn := fmt.Sprintf( + "%v?%v", cfg.DatabaseFileName, sqliteOptions.Encode(), + ) + db, err := sql.Open("sqlite", dsn) + if err != nil { + return nil, err + } + + if !cfg.SkipMigrations { + // Now that the database is open, populate the database with + // our set of schemas based on our embedded in-memory file + // system. + // + // First, we'll need to open up a new migration instance for + // our current target database: sqlite. + driver, err := sqlite_migrate.WithInstance( + db, &sqlite_migrate.Config{}, + ) + if err != nil { + return nil, err + } + + err = applyMigrations( + sqlSchemas, driver, "sqlc/migrations", "sqlc", + ) + if err != nil { + return nil, err + } + } + + queries := sqlc.New(db) + + return &SqliteSwapStore{ + cfg: cfg, + BaseDB: &BaseDB{ + DB: db, + Queries: queries, + network: network, + }, + }, nil +} + +// NewTestSqliteDB is a helper function that creates an SQLite database for +// testing. +func NewTestSqliteDB(t *testing.T) *SqliteSwapStore { + t.Helper() + + t.Logf("Creating new SQLite DB for testing") + + dbFileName := filepath.Join(t.TempDir(), "tmp.db") + sqlDB, err := NewSqliteStore(&SqliteConfig{ + DatabaseFileName: dbFileName, + SkipMigrations: false, + }, &chaincfg.MainNetParams) + require.NoError(t, err) + + t.Cleanup(func() { + require.NoError(t, sqlDB.DB.Close()) + }) + + return sqlDB +} + +// BaseDB is the base database struct that each implementation can embed to +// gain some common functionality. +type BaseDB struct { + network *chaincfg.Params + + *sql.DB + + *sqlc.Queries +} + +// BeginTx wraps the normal sql specific BeginTx method with the TxOptions +// interface. This interface is then mapped to the concrete sql tx options +// struct. +func (db *BaseDB) BeginTx(ctx context.Context, + opts TxOptions) (*sql.Tx, error) { + + sqlOptions := sql.TxOptions{ + ReadOnly: opts.ReadOnly(), + } + return db.DB.BeginTx(ctx, &sqlOptions) +} + +// ExecTx is a wrapper for txBody to abstract the creation and commit of a db +// transaction. The db transaction is embedded in a `*postgres.Queries` that +// txBody needs to use when executing each one of the queries that need to be +// applied atomically. +func (db *BaseDB) ExecTx(ctx context.Context, txOptions TxOptions, + txBody func(*sqlc.Queries) error) error { + + // Create the db transaction. + tx, err := db.BeginTx(ctx, txOptions) + if err != nil { + return err + } + + // Rollback is safe to call even if the tx is already closed, so if + // the tx commits successfully, this is a no-op. + defer tx.Rollback() //nolint: errcheck + + if err := txBody(db.Queries.WithTx(tx)); err != nil { + return err + } + + // Commit transaction. + if err = tx.Commit(); err != nil { + return err + } + + return nil +} + +// TxOptions represents a set of options one can use to control what type of +// database transaction is created. Transaction can wither be read or write. +type TxOptions interface { + // ReadOnly returns true if the transaction should be read only. + ReadOnly() bool +} + +// SqliteTxOptions defines the set of db txn options the KeyStore +// understands. +type SqliteTxOptions struct { + // readOnly governs if a read only transaction is needed or not. + readOnly bool +} + +// NewKeyStoreReadOpts returns a new KeyStoreTxOptions instance triggers a read +// transaction. +func NewSqlReadOpts() *SqliteTxOptions { + return &SqliteTxOptions{ + readOnly: true, + } +} + +// ReadOnly returns true if the transaction should be read only. +// +// NOTE: This implements the TxOptions +func (r *SqliteTxOptions) ReadOnly() bool { + return r.readOnly +}