-
-
Save gedw99/434078435cc1eff3728c70f2fb65fd9f to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package utils | |
import ( | |
"context" | |
"golang.org/x/sync/errgroup" | |
) | |
type ErrGroupSharedCtx struct { | |
eg *errgroup.Group | |
ctx context.Context | |
} | |
type CtxErrFunc func(ctx context.Context) error | |
func NewErrGroupSharedCtx(ctx context.Context, funcs ...CtxErrFunc) *ErrGroupSharedCtx { | |
eg, ctx := errgroup.WithContext(ctx) | |
egCtx := &ErrGroupSharedCtx{ | |
eg: eg, | |
ctx: ctx, | |
} | |
egCtx.Go(funcs...) | |
return egCtx | |
} | |
func (egc *ErrGroupSharedCtx) Go(funcs ...CtxErrFunc) { | |
for _, f := range funcs { | |
fn := f | |
egc.eg.Go(func() error { | |
return fn(egc.ctx) | |
}) | |
} | |
} | |
func (egc *ErrGroupSharedCtx) Wait() error { | |
return egc.eg.Wait() | |
} | |
type ErrGroupSeparateCtx struct { | |
eg *errgroup.Group | |
} | |
func NewErrGroupSeparateCtx() *ErrGroupSeparateCtx { | |
eg := &errgroup.Group{} | |
egCtx := &ErrGroupSeparateCtx{ | |
eg: eg, | |
} | |
return egCtx | |
} | |
func (egc *ErrGroupSeparateCtx) Go(ctx context.Context, funcs ...CtxErrFunc) { | |
for _, f := range funcs { | |
fn := f | |
egc.eg.Go(func() error { | |
return fn(ctx) | |
}) | |
} | |
} | |
func (egc *ErrGroupSeparateCtx) Wait() error { | |
return egc.eg.Wait() | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package utils | |
import ( | |
"context" | |
"fmt" | |
"os" | |
"path/filepath" | |
"runtime" | |
"time" | |
"google.golang.org/protobuf/types/known/timestamppb" | |
"zombiezen.com/go/sqlite" | |
"zombiezen.com/go/sqlite/sqlitemigration" | |
"zombiezen.com/go/sqlite/sqlitex" | |
) | |
type Database struct { | |
write *sqlitex.Pool | |
read *sqlitex.Pool | |
} | |
type TxFn func(tx *sqlite.Conn) error | |
func NewDatabase(ctx context.Context, dbFilename string, migrations []string) (*Database, error) { | |
if err := os.MkdirAll(filepath.Dir(dbFilename), 0755); err != nil { | |
return nil, fmt.Errorf("could not create database directory: %w", err) | |
} | |
uri := fmt.Sprintf("file:%s?_journal_mode=WAL&_synchronous=NORMAL", dbFilename) | |
writePool, err := sqlitex.Open(uri, 0, 1) | |
if err != nil { | |
return nil, fmt.Errorf("could not open write pool: %w", err) | |
} | |
conn := writePool.Get(ctx) | |
defer writePool.Put(conn) | |
schema := sqlitemigration.Schema{ | |
Migrations: migrations, | |
} | |
if err := sqlitemigration.Migrate(ctx, conn, schema); err != nil { | |
return nil, fmt.Errorf("could not migrate event store: %w", err) | |
} | |
readPool, err := sqlitex.Open(uri, 0, runtime.NumCPU()) | |
if err != nil { | |
return nil, fmt.Errorf("could not open read pool: %w", err) | |
} | |
db := &Database{ | |
write: writePool, | |
read: readPool, | |
} | |
return db, nil | |
} | |
func (db *Database) Close() error { | |
if err := db.write.Close(); err != nil { | |
return fmt.Errorf("failed to close write pool: %w", err) | |
} | |
if err := db.read.Close(); err != nil { | |
return fmt.Errorf("failed to close read pool: %w", err) | |
} | |
return nil | |
} | |
func (db *Database) WriteTX(ctx context.Context, fn TxFn) (err error) { | |
conn := db.write.Get(ctx) | |
if conn == nil { | |
return fmt.Errorf("could not get write connection from pool") | |
} | |
defer db.write.Put(conn) | |
endFn, err := sqlitex.ImmediateTransaction(conn) | |
if err != nil { | |
return fmt.Errorf("could not start transaction: %w", err) | |
} | |
defer endFn(&err) | |
if err := fn(conn); err != nil { | |
return fmt.Errorf("could not execute write transaction: %w", err) | |
} | |
return nil | |
} | |
func (db *Database) ReadTX(ctx context.Context, fn TxFn) (err error) { | |
conn := db.read.Get(ctx) | |
if conn == nil { | |
return fmt.Errorf("could not get read connection from pool") | |
} | |
defer db.read.Put(conn) | |
endFn := sqlitex.Transaction(conn) | |
defer endFn(&err) | |
if err := fn(conn); err != nil { | |
return fmt.Errorf("could not execute read transaction: %w", err) | |
} | |
return nil | |
} | |
const ( | |
secondsInADay = 86400 | |
UnixEpochJulianDay = 2440587.5 | |
) | |
// TimeToJulianDay converts a time.Time into a Julian day. | |
func TimeToJulianDay(t time.Time) float64 { | |
return float64(t.UTC().Unix())/secondsInADay + UnixEpochJulianDay | |
} | |
// JulianDayToTime converts a Julian day into a time.Time. | |
func JulianDayToTime(d float64) time.Time { | |
return time.Unix(int64((d-UnixEpochJulianDay)*secondsInADay), 0).UTC() | |
} | |
func JulianNow() float64 { | |
return TimeToJulianDay(time.Now()) | |
} | |
func TimestampJulian(ts *timestamppb.Timestamp) float64 { | |
return TimeToJulianDay(ts.AsTime()) | |
} | |
func JulianDayToTimestamp(f float64) *timestamppb.Timestamp { | |
t := JulianDayToTime(f) | |
return timestamppb.New(t) | |
} | |
func JulianDayToTimestampStmt(stmt *sqlite.Stmt, param string) *timestamppb.Timestamp { | |
julianDays := stmt.GetFloat(param) | |
return JulianDayToTimestamp(julianDays) | |
} | |
func JulianDayToTimeStmt(stmt *sqlite.Stmt, param string) time.Time { | |
julianDays := stmt.GetFloat(param) | |
return JulianDayToTime(julianDays) | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment