Created
December 22, 2022 12:58
-
-
Save mikberg/cef190ba88716a6b0c171d0f4b745c93 to your computer and use it in GitHub Desktop.
Promscale metric data retention job
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
import ( | |
"context" | |
"errors" | |
"fmt" | |
"math/rand" | |
"os" | |
"os/signal" | |
"sync" | |
"syscall" | |
"time" | |
"github.com/alecthomas/kong" | |
"github.com/jackc/pgx/v4/pgxpool" | |
"github.com/rs/zerolog" | |
"github.com/rs/zerolog/log" | |
) | |
var MaxWorkers = 4 | |
type Work interface { | |
Do(ctx context.Context, db *pgxpool.Pool) error | |
} | |
type MetricRetentionWork struct { | |
ID int | |
TableSchema string | |
MetricName string | |
} | |
func (w MetricRetentionWork) Do(ctx context.Context, db *pgxpool.Pool) error { | |
start := time.Now() | |
log := log.With().Str("metric_name", w.MetricName).Logger() | |
log.Debug().Msg("handling metric") | |
var locked bool | |
if err := db.QueryRow(ctx, "SELECT _prom_catalog.lock_metric_for_maintenance($1, wait=>false)", w.ID).Scan(&locked); err != nil { | |
return fmt.Errorf("failed to lock metric for maintenance: %w", err) | |
} | |
if !locked { | |
log.Warn().Msgf("skipping metric, unable to lock") | |
return nil | |
} | |
log.Debug().Msg("metric locked for maintenance") | |
if _, err := db.Exec(ctx, "CALL _prom_catalog.drop_metric_chunks($1, $2, NOW() - _prom_catalog.get_metric_retention_period($1, $2), log_verbose=>true)", w.TableSchema, w.MetricName); err != nil { | |
return fmt.Errorf("failed to drop metric chunks: %w", err) | |
} | |
log.Debug().Msg("called drop_metric_chunks") | |
if _, err := db.Exec(ctx, "SELECT _prom_catalog.unlock_metric_for_maintenance($1)", w.ID); err != nil { | |
return fmt.Errorf("failed to unlock metric: %w", err) | |
} | |
log.Debug().Msg("unlocked metric") | |
// not sure if this helps | |
if _, err := db.Exec(ctx, "COMMIT"); err != nil { | |
return fmt.Errorf("failed to commit: %w", err) | |
} | |
log.Info().Dur("duration", time.Since(start)).Msg("done handling metric retention work") | |
return nil | |
} | |
type Globals struct { | |
LogLevel string `type:"string" name:"log-level" default:"info" env:"LOG_LEVEL"` | |
} | |
type App struct { | |
Globals | |
Maintain MaintainCmd `cmd:"" default:""` | |
} | |
type MaintainCmd struct { | |
DatabaseURL string `type:"string" name:"database-url" required:"" env:"DATABASE_URL"` | |
MaxWorkers int32 `name:"max-workers" env:"MAX_WORKERS" default:"3"` | |
ConnLifetime time.Duration `name:"conn-lifetime" env:"CONN_LIFETIME" default:"3m" help:"How long to let each db connection run before recycling it"` | |
db *pgxpool.Pool | |
} | |
func (c *MaintainCmd) Run() error { | |
ctx, cancel := context.WithCancel(context.Background()) | |
defer cancel() | |
// create database connection config | |
connCfg, err := pgxpool.ParseConfig(c.DatabaseURL) | |
if err != nil { | |
return fmt.Errorf("failed to parse database url: %w", err) | |
} | |
connCfg.MaxConnLifetime = c.ConnLifetime | |
connCfg.MaxConns = c.MaxWorkers | |
// connect to database | |
db, err := pgxpool.ConnectConfig(ctx, connCfg) | |
if err != nil { | |
return fmt.Errorf("failed to connect to database: %w", err) | |
} | |
c.db = db | |
defer c.db.Close() | |
// start producing work | |
errcs := []<-chan error{} | |
workc, errc := c.produceWork(ctx) | |
errcs = append(errcs, errc) | |
for idx := 0; idx < int(c.MaxWorkers); idx++ { | |
errc := c.startWorker(ctx, workc) | |
errcs = append(errcs, errc) | |
} | |
donec := c.wait(errcs) | |
sigc := make(chan os.Signal, 1) | |
signal.Notify(sigc, syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT) | |
select { | |
case s := <-sigc: | |
log.Info().Msgf("signal received [%v] cancelling everything ...", s) | |
cancel() | |
case <-donec: | |
log.Info().Msg("done!") | |
} | |
return nil | |
} | |
func (c *MaintainCmd) wait(errcs []<-chan error) <-chan struct{} { | |
out := make(chan struct{}) | |
var wg sync.WaitGroup | |
waiter := func(errc <-chan error) { | |
defer wg.Done() | |
for err := range errc { | |
if err != nil && !errors.Is(err, context.Canceled) { | |
log.Error().Err(err).Msg("error occured") | |
} | |
} | |
} | |
wg.Add(len(errcs)) | |
for _, errc := range errcs { | |
go waiter(errc) | |
} | |
go func() { | |
defer close(out) | |
wg.Wait() | |
}() | |
return out | |
} | |
func (c *MaintainCmd) startWorker(ctx context.Context, workc <-chan Work) <-chan error { | |
errc := make(chan error, 1) | |
go func() { | |
defer close(errc) | |
for work := range workc { | |
log.Debug().Msgf("worker received work: %+v", work) | |
if err := work.Do(ctx, c.db); err != nil { | |
errc <- err | |
continue | |
} | |
} | |
log.Debug().Msgf("worker is done") | |
}() | |
return errc | |
} | |
func (c *MaintainCmd) produceWork(ctx context.Context) (<-chan Work, <-chan error) { | |
workc := make(chan Work) | |
errc := make(chan error, 1) | |
buflen := c.MaxWorkers * 3 | |
rollingLatencies := make([]time.Duration, buflen, buflen) | |
var kalm time.Duration | |
getAvgLatency := func() time.Duration { | |
sum := time.Duration(0) | |
for _, l := range rollingLatencies { | |
sum += l | |
} | |
return time.Duration(float64(sum) / float64(len(rollingLatencies)) * float64(c.MaxWorkers)) | |
} | |
go func() { | |
defer close(workc) | |
defer close(errc) | |
stopWhenDone := false | |
for { | |
log.Info().Msg("looking for more work") | |
work, err := c.createMetricRetentionWork(ctx, 100) | |
if err != nil { | |
errc <- err | |
return | |
} | |
log.Info().Msgf("got %d units of fresh work", len(work)) | |
if len(work) < 50 { | |
stopWhenDone = true | |
} | |
for idx, work := range work { | |
start := time.Now() | |
select { | |
case workc <- work: | |
log.Debug().Msgf("sent work") | |
rollingLatencies[idx%len(rollingLatencies)] = time.Since(start) | |
case <-ctx.Done(): | |
// produce no more work; workers will continue until their next cycle | |
return | |
} | |
// every once in a while, check if we should increase calmness | |
if idx%int(c.MaxWorkers) == 0 { | |
avgLatency := getAvgLatency() | |
// if latency is high, enhance our calm | |
if avgLatency > 10*time.Second { | |
log.Warn().Dur("average_latency", avgLatency).Msg("average latency is high, calming down ...") | |
kalm = avgLatency | |
} else { | |
if kalm > 0 { | |
log.Info().Dur("average_latency", avgLatency).Msg("speeding up again") | |
} | |
kalm = 0 | |
} | |
} | |
time.Sleep(kalm) | |
} | |
if stopWhenDone { | |
log.Info().Msg("stopping work production") | |
return | |
} | |
} | |
}() | |
return workc, errc | |
} | |
func (c *MaintainCmd) createMetricRetentionWork(ctx context.Context, limit int) ([]Work, error) { | |
work := []Work{} | |
rows, err := c.db.Query(ctx, "select id, table_schema, metric_name from _prom_catalog.get_metrics_that_need_drop_chunk() limit $1", limit) | |
if err != nil { | |
return nil, fmt.Errorf("faied to query for metrics that need drop chunk: %w", err) | |
} | |
defer rows.Close() | |
var w MetricRetentionWork | |
for rows.Next() { | |
if err := rows.Scan(&w.ID, &w.TableSchema, &w.MetricName); err != nil { | |
return nil, fmt.Errorf("failed to scan row: %w", err) | |
} | |
work = append(work, w) | |
} | |
// shuffle the work | |
rand.Shuffle(len(work), func(i, j int) { work[i], work[j] = work[j], work[i] }) | |
return work, nil | |
} | |
func main() { | |
app := App{ | |
Globals: Globals{}, | |
} | |
cli := kong.Parse( | |
&app, | |
kong.Name("promscale-maintainer"), | |
kong.UsageOnError(), | |
) | |
zerolog.DurationFieldUnit = time.Second | |
if lvl, err := zerolog.ParseLevel(app.LogLevel); err != nil { | |
cli.FatalIfErrorf(err) | |
} else { | |
zerolog.SetGlobalLevel(lvl) | |
} | |
cli.FatalIfErrorf(cli.Run(app.Globals)) | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment