Compare commits

...

5 Commits

Author SHA1 Message Date
1aca178eb3 Cleanup global functions
All checks were successful
Gitea Actions Demo / Explore-Gitea-Actions (push) Successful in 37s
Signed-off-by: Jan Tytgat <jan.tytgat@corelayer.eu>
2025-04-29 22:42:27 +02:00
b573ec80c1 Refactor application
Signed-off-by: Jan Tytgat <jan.tytgat@corelayer.eu>
2025-04-29 22:39:49 +02:00
c0349917de Refactor httpd functions
Signed-off-by: Jan Tytgat <jan.tytgat@corelayer.eu>
2025-04-29 22:27:14 +02:00
aa56895853 Add slogd package back into repository, continued...
Signed-off-by: Jan Tytgat <jan.tytgat@corelayer.eu>
2025-04-29 22:26:54 +02:00
f8121b8ada Add slogd package back into repository
Signed-off-by: Jan Tytgat <jan.tytgat@corelayer.eu>
2025-04-29 22:26:20 +02:00
12 changed files with 360 additions and 80 deletions

View File

@ -7,8 +7,9 @@ import (
"log/slog" "log/slog"
"os/signal" "os/signal"
"git.flexabyte.io/flexabyte/go-slogd/slogd"
"github.com/spf13/cobra" "github.com/spf13/cobra"
"git.flexabyte.io/flexabyte/go-kit/slogd"
) )
type Application interface { type Application interface {
@ -42,41 +43,21 @@ func (a *application) ExecuteContext(ctx context.Context) error {
// Result channel for command output // Result channel for command output
chExe := make(chan error) chExe := make(chan error)
// Run the application command // Run the application command using the signal context and output channel
a.config.Logger.LogAttrs(ctx, slogd.LevelTrace, "executing application context") a.config.Logger.LogAttrs(ctx, slogd.LevelTrace, "executing application context")
go func(ctx context.Context, chErr chan error) { go func(ctx context.Context, chErr chan error) {
chErr <- a.cmd.ExecuteContext(ctx) chErr <- a.cmd.ExecuteContext(ctx)
}(sigCtx, chExe) }(sigCtx, chExe)
// Wait for command output or a shutdown signal // Wait for command output or a shutdown signal
var err error
select { select {
// sigCtx.Done() returns a channel that will have a message when the context is canceled. case <-sigCtx.Done(): // sigCtx.Done() returns a channel that will have a message when the context is canceled.
// Alternatively, chExe will receive the response from the execution context if the application finishes.
case <-sigCtx.Done():
sigCancel() sigCancel()
return a.handleShutdownSignal(ctx)
// Adapt the shutdown scenario if a graceful shutdown period is configured case err := <-chExe: // Alternatively, chExe will receive the response from the execution context if the application finishes.
switch a.config.EnableGracefulShutdown && a.config.ShutdownTimeout > 0 {
case true:
a.config.Logger.LogAttrs(ctx, slogd.LevelTrace, "gracefully shutting down application")
if err = a.gracefulShutdown(ctx); !errors.Is(err, context.DeadlineExceeded) {
a.config.Logger.LogAttrs(ctx, slogd.LevelTrace, "graceful shutdown completed with error", slog.Any("error", err))
return err
}
a.config.Logger.LogAttrs(ctx, slogd.LevelTrace, "graceful shutdown completed")
return nil
case false:
a.config.Logger.LogAttrs(ctx, slogd.LevelTrace, "immediately shutting down application")
return nil
}
case err = <-chExe:
a.config.Logger.LogAttrs(ctx, slogd.LevelTrace, "application terminated successfully") a.config.Logger.LogAttrs(ctx, slogd.LevelTrace, "application terminated successfully")
return err return err
} }
a.config.Logger.LogAttrs(ctx, slogd.LevelTrace, "application terminated unexpectedly")
return nil
} }
func (a *application) gracefulShutdown(ctx context.Context) error { func (a *application) gracefulShutdown(ctx context.Context) error {
@ -90,10 +71,37 @@ func (a *application) gracefulShutdown(ctx context.Context) error {
defer sigCancel() // Ensure that this gets called. defer sigCancel() // Ensure that this gets called.
select { select {
case <-shutdownCtx.Done(): case <-shutdownCtx.Done(): // Timeout exceeded
return shutdownCtx.Err() return shutdownCtx.Err()
case <-sigCtx.Done(): case <-sigCtx.Done(): // Received additional shutdown signal to forcefully exit
fmt.Println("exiting...")
sigCancel() sigCancel()
return nil return fmt.Errorf("process killed")
}
}
func (a *application) handleGracefulShutdown(ctx context.Context) error {
a.config.Logger.LogAttrs(ctx, slogd.LevelTrace, "gracefully shutting down application")
var err error
if err = a.gracefulShutdown(ctx); !errors.Is(err, context.DeadlineExceeded) {
a.config.Logger.LogAttrs(ctx, slogd.LevelWarn, "graceful shutdown failed", slog.Any("error", err))
return nil
}
a.config.Logger.LogAttrs(ctx, slogd.LevelTrace, "graceful shutdown completed")
return nil
}
func (a *application) handleShutdownSignal(ctx context.Context) error {
// Adapt the shutdown scenario if a graceful shutdown period is configured
switch a.config.EnableGracefulShutdown && a.config.ShutdownTimeout > 0 {
case true:
return a.handleGracefulShutdown(ctx)
case false:
a.config.Logger.LogAttrs(ctx, slogd.LevelTrace, "immediately shutting down application")
return nil
default:
panic("cannot handle shutdown signal")
} }
} }

View File

@ -11,7 +11,7 @@ import (
"github.com/spf13/cobra" "github.com/spf13/cobra"
"github.com/spf13/pflag" "github.com/spf13/pflag"
"git.flexabyte.io/flexabyte/go-slogd/slogd" "git.flexabyte.io/flexabyte/go-kit/slogd"
) )
var DefaultShutdownSignals = []os.Signal{syscall.SIGINT, syscall.SIGTERM, syscall.SIGHUP, syscall.SIGQUIT} var DefaultShutdownSignals = []os.Signal{syscall.SIGINT, syscall.SIGTERM, syscall.SIGHUP, syscall.SIGQUIT}
@ -22,7 +22,6 @@ var (
persistentPreRunE []func(cmd *cobra.Command, args []string) error // collection of PreRunE functions persistentPreRunE []func(cmd *cobra.Command, args []string) error // collection of PreRunE functions
persistentPostRunE []func(cmd *cobra.Command, args []string) error // collection of PostRunE functions persistentPostRunE []func(cmd *cobra.Command, args []string) error // collection of PostRunE functions
outWriter io.Writer = os.Stdout outWriter io.Writer = os.Stdout
// version semver.Version
) )
func helpFuncE(cmd *cobra.Command, args []string) error { func helpFuncE(cmd *cobra.Command, args []string) error {
@ -30,17 +29,12 @@ func helpFuncE(cmd *cobra.Command, args []string) error {
} }
func normalizeFunc(f *pflag.FlagSet, name string) pflag.NormalizedName { func normalizeFunc(f *pflag.FlagSet, name string) pflag.NormalizedName {
// switch name {
// case "no-color":
// name = "log-type"
// break
// }
return pflag.NormalizedName(name) return pflag.NormalizedName(name)
} }
func persistentPreRunFuncE(cmd *cobra.Command, args []string) error { func persistentPreRunFuncE(cmd *cobra.Command, args []string) error {
slogd.SetLevel(slogd.Level(logLevelFlag)) slogd.SetLevel(slogd.Level(logLevelFlag))
// if slogd.ActiveHandler() == slogd_colored.HandlerColor && noColorFlag {
if slogd.ActiveHandler() != slogd.HandlerJSON && noColorFlag { if slogd.ActiveHandler() != slogd.HandlerJSON && noColorFlag {
slogd.UseHandler(slogd.HandlerText) slogd.UseHandler(slogd.HandlerText)
cmd.SetContext(slogd.WithContext(cmd.Context())) cmd.SetContext(slogd.WithContext(cmd.Context()))

View File

@ -3,8 +3,9 @@ package application
import ( import (
"log/slog" "log/slog"
"git.flexabyte.io/flexabyte/go-slogd/slogd"
"github.com/spf13/cobra" "github.com/spf13/cobra"
"git.flexabyte.io/flexabyte/go-kit/slogd"
) )
const ( const (

View File

@ -5,14 +5,13 @@ import (
"log/slog" "log/slog"
"net/http" "net/http"
"os" "os"
"syscall"
"time" "time"
"git.flexabyte.io/flexabyte/go-slogd/slogd"
"github.com/spf13/cobra" "github.com/spf13/cobra"
"git.flexabyte.io/flexabyte/go-kit/application" "git.flexabyte.io/flexabyte/go-kit/application"
"git.flexabyte.io/flexabyte/go-kit/httpd" "git.flexabyte.io/flexabyte/go-kit/httpd"
"git.flexabyte.io/flexabyte/go-kit/slogd"
) )
var ( var (
@ -55,18 +54,15 @@ func main() {
EnableGracefulShutdown: true, EnableGracefulShutdown: true,
Logger: slogd.Logger(), Logger: slogd.Logger(),
OverrideRunE: func(cmd *cobra.Command, args []string) error { OverrideRunE: func(cmd *cobra.Command, args []string) error {
// fmt.Println("overrideRunE")
// time.Sleep(1 * time.Second)
// fmt.Println("overrideRunE done")
mux := http.NewServeMux() mux := http.NewServeMux()
mux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) { mux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
slogd.FromContext(r.Context()).LogAttrs(r.Context(), slogd.LevelInfo, "request received", slog.String("method", r.Method), slog.String("url", r.URL.String()), slog.String("user-agent", r.UserAgent())) slogd.FromContext(r.Context()).LogAttrs(r.Context(), slogd.LevelInfo, "request received", slog.String("method", r.Method), slog.String("url", r.URL.String()), slog.String("user-agent", r.UserAgent()))
}) })
return httpd.RunHttpServer(cmd.Context(), slogd.FromContext(cmd.Context()), "127.0.0.1", 28000, mux, 1*time.Second) return httpd.RunHttpServer(cmd.Context(), slogd.Logger(), "127.0.0.1", 28000, mux, 5*time.Second)
}, },
PersistentPreRunE: nil, PersistentPreRunE: nil,
PersistentPostRunE: nil, PersistentPostRunE: nil,
ShutdownSignals: []os.Signal{syscall.SIGINT, syscall.SIGTERM, syscall.SIGHUP, syscall.SIGQUIT}, ShutdownSignals: application.DefaultShutdownSignals,
ShutdownTimeout: 5 * time.Second, ShutdownTimeout: 5 * time.Second,
SubCommands: nil, SubCommands: nil,
SubCommandInitializeFunc: nil, SubCommandInitializeFunc: nil,

5
go.mod
View File

@ -3,7 +3,8 @@ module git.flexabyte.io/flexabyte/go-kit
go 1.24.2 go 1.24.2
require ( require (
git.flexabyte.io/flexabyte/go-slogd v0.0.0-20250428200220-8e65f81d9450 github.com/samber/slog-formatter v1.2.0
github.com/samber/slog-multi v1.4.0
github.com/spf13/cobra v1.9.1 github.com/spf13/cobra v1.9.1
github.com/spf13/pflag v1.0.6 github.com/spf13/pflag v1.0.6
) )
@ -11,7 +12,5 @@ require (
require ( require (
github.com/inconshreveable/mousetrap v1.1.0 // indirect github.com/inconshreveable/mousetrap v1.1.0 // indirect
github.com/samber/lo v1.50.0 // indirect github.com/samber/lo v1.50.0 // indirect
github.com/samber/slog-formatter v1.2.0 // indirect
github.com/samber/slog-multi v1.4.0 // indirect
golang.org/x/text v0.24.0 // indirect golang.org/x/text v0.24.0 // indirect
) )

2
go.sum
View File

@ -1,5 +1,3 @@
git.flexabyte.io/flexabyte/go-slogd v0.0.0-20250428200220-8e65f81d9450 h1:VCstITW9pMgR5EnSLU83UiK7llLOguFLAo26VOOnzrI=
git.flexabyte.io/flexabyte/go-slogd v0.0.0-20250428200220-8e65f81d9450/go.mod h1:rL08OHw4aycfjkZOS8pBfLapeG3IZHxIInW29hVVSrI=
github.com/cpuguy83/go-md2man/v2 v2.0.6/go.mod h1:oOW0eioCTA6cOiMLiUPZOpcVxMig6NIQQ7OS05n1F4g= github.com/cpuguy83/go-md2man/v2 v2.0.6/go.mod h1:oOW0eioCTA6cOiMLiUPZOpcVxMig6NIQQ7OS05n1F4g=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=

View File

@ -3,12 +3,14 @@ package httpd
import ( import (
"context" "context"
"errors" "errors"
"fmt"
"log/slog" "log/slog"
"net"
"net/http" "net/http"
"strconv" "strconv"
"time" "time"
"git.flexabyte.io/flexabyte/go-slogd/slogd" "git.flexabyte.io/flexabyte/go-kit/slogd"
) )
func RunHttpServer(ctx context.Context, log *slog.Logger, listenAddress string, port int, h http.Handler, shutdownTimeout time.Duration) error { func RunHttpServer(ctx context.Context, log *slog.Logger, listenAddress string, port int, h http.Handler, shutdownTimeout time.Duration) error {
@ -16,39 +18,14 @@ func RunHttpServer(ctx context.Context, log *slog.Logger, listenAddress string,
Addr: listenAddress + ":" + strconv.Itoa(port), Addr: listenAddress + ":" + strconv.Itoa(port),
Handler: h} Handler: h}
return run(ctx, s, log, shutdownTimeout) log.LogAttrs(ctx, slogd.LevelTrace, "starting http server", slog.String("listenAddress", fmt.Sprintf("http://%s", s.Addr)))
}
func RunSocketHttpServer(ctx context.Context, log *slog.Logger, socketPath string, h http.Handler, shutdownTimeout time.Duration) error { shutdownCtx, shutdownCancel := context.WithCancel(ctx)
s := &http.Server{ defer shutdownCancel()
Handler: h}
return run(ctx, s, log, shutdownTimeout) // Run goroutine to handle graceful shutdown
}
func run(ctx context.Context, s *http.Server, log *slog.Logger, shutdownTimeout time.Duration) error {
log.LogAttrs(ctx, slogd.LevelTrace, "starting http server", slog.String("listenAddress", s.Addr))
idleConnectionsClosed := make(chan struct{}) idleConnectionsClosed := make(chan struct{})
go shutdown(shutdownCtx, log, s, shutdownTimeout, idleConnectionsClosed)
runCtx, runCancel := context.WithCancel(ctx)
defer runCancel()
go func(ctx context.Context) {
log.LogAttrs(ctx, slogd.LevelTrace, "awaiting shutdown signal for http server", slog.String("listenAddress", s.Addr))
<-ctx.Done()
shutdownCtx, shutdownCancel := context.WithTimeout(context.Background(), shutdownTimeout)
defer shutdownCancel()
log.LogAttrs(shutdownCtx, slogd.LevelTrace, "shutting down http server", slog.String("listenAddress", s.Addr))
// We received an interrupt signal, shut down.
if err := s.Shutdown(shutdownCtx); err != nil {
// Error from closing listeners, or context timeout:
log.LogAttrs(ctx, slogd.LevelTrace, "shutting down http server failed", slog.String("listenAddress", s.Addr), slog.Any("error", err))
}
log.LogAttrs(shutdownCtx, slogd.LevelTrace, "shutting down http server completed", slog.String("listenAddress", s.Addr))
close(idleConnectionsClosed)
}(runCtx)
var err error var err error
if err = s.ListenAndServe(); !errors.Is(err, http.ErrServerClosed) { if err = s.ListenAndServe(); !errors.Is(err, http.ErrServerClosed) {
@ -60,3 +37,54 @@ func run(ctx context.Context, s *http.Server, log *slog.Logger, shutdownTimeout
<-idleConnectionsClosed <-idleConnectionsClosed
return err return err
} }
func RunSocketHttpServer(ctx context.Context, log *slog.Logger, socketPath string, h http.Handler, shutdownTimeout time.Duration) error {
s := &http.Server{
Handler: h}
log.LogAttrs(ctx, slogd.LevelTrace, "starting http server", slog.String("socket", s.Addr))
shutdownCtx, shutdownCancel := context.WithCancel(ctx)
defer shutdownCancel()
// Run goroutine to handle graceful shutdown
idleConnectionsClosed := make(chan struct{})
go shutdown(shutdownCtx, log, s, shutdownTimeout, idleConnectionsClosed)
var err error
var config = new(net.ListenConfig)
var socket net.Listener
if socket, err = config.Listen(ctx, "unix", socketPath); err != nil {
log.LogAttrs(ctx, slogd.LevelError, "failed to listen on socket", slog.String("error", err.Error()))
return err
}
if err = s.Serve(socket); err != nil && !errors.Is(err, http.ErrServerClosed) {
// Error starting or closing listener:
log.LogAttrs(ctx, slogd.LevelError, "http server start failed", slog.String("error", err.Error()))
return err
}
<-idleConnectionsClosed
return err
}
func shutdown(ctx context.Context, log *slog.Logger, s *http.Server, shutdownTimeout time.Duration, idleConnectionsClosed chan struct{}) {
log.LogAttrs(ctx, slogd.LevelTrace, "awaiting shutdown signal for http server", slog.String("listenAddress", s.Addr))
<-ctx.Done()
// When shutdown signal is received, create a new context with the configured shutdown timeout
shutdownCtx, shutdownCancel := context.WithTimeout(context.Background(), shutdownTimeout)
defer shutdownCancel()
log.LogAttrs(shutdownCtx, slogd.LevelTrace, "shutdown signal received for http server", slog.String("listenAddress", s.Addr))
time.Sleep(2 * time.Second)
// We received an interrupt signal, shut down.
if err := s.Shutdown(shutdownCtx); err != nil {
// Error from closing listeners, or context timeout:
log.LogAttrs(ctx, slogd.LevelTrace, "shutdown for http server failed", slog.String("listenAddress", s.Addr), slog.Any("error", err))
}
log.LogAttrs(shutdownCtx, slogd.LevelTrace, "shutdown for http server completed", slog.String("listenAddress", s.Addr))
close(idleConnectionsClosed)
}

32
slogd/disabledHandler.go Normal file
View File

@ -0,0 +1,32 @@
package slogd
import (
"context"
"log/slog"
)
func newDisabledHandler() slog.Handler {
return &disabledHandler{}
}
func registerDisabledHandler(activate bool) {
RegisterSink(handlerDisabled, newDisabledHandler(), activate)
}
type disabledHandler struct{}
func (h *disabledHandler) Handle(ctx context.Context, r slog.Record) error {
return nil
}
func (h *disabledHandler) Enabled(ctx context.Context, level slog.Level) bool {
return false
}
func (h *disabledHandler) WithAttrs(attrs []slog.Attr) slog.Handler {
return h
}
func (h *disabledHandler) WithGroup(group string) slog.Handler {
return h
}

10
slogd/jsonHandler.go Normal file
View File

@ -0,0 +1,10 @@
package slogd
import (
"io"
"log/slog"
)
func RegisterJSONHandler(w io.Writer, activate bool) {
RegisterSink(HandlerJSON, slog.NewJSONHandler(w, HandlerOptions()), activate)
}

56
slogd/level.go Normal file
View File

@ -0,0 +1,56 @@
package slogd
import (
"log/slog"
"strings"
)
const (
LevelTrace = slog.Level(-8)
LevelDebug = slog.LevelDebug
LevelInfo = slog.LevelInfo
LevelNotice = slog.Level(2)
LevelWarn = slog.LevelWarn
LevelError = slog.LevelError
LevelFatal = slog.Level(12)
LevelDefault = LevelInfo
)
var levelNames = map[slog.Leveler]string{
LevelTrace: "TRACE",
LevelDebug: "DEBUG",
LevelInfo: "INFO",
LevelNotice: "NOTICE",
LevelWarn: "WARN",
LevelError: "ERROR",
LevelFatal: "FATAL",
}
func ReplaceAttrs(groups []string, a slog.Attr) slog.Attr {
if a.Key == slog.LevelKey {
a.Value = slog.StringValue(LevelName(a.Value.Any().(slog.Level)))
}
return a
}
func Level(l string) slog.Level {
mux.Lock()
defer mux.Unlock()
for k, v := range levelNames {
if strings.ToUpper(l) == v {
return k.Level()
}
}
return LevelDefault
}
func LevelName(l slog.Level) string {
mux.Lock()
defer mux.Unlock()
for k, v := range levelNames {
if k == l {
return v
}
}
return levelNames[LevelDefault]
}

148
slogd/slogd.go Normal file
View File

@ -0,0 +1,148 @@
package slogd
import (
"context"
"log/slog"
"sync"
slogformatter "github.com/samber/slog-formatter"
slogmulti "github.com/samber/slog-multi"
)
const (
HandlerText string = "text"
HandlerJSON string = "json"
handlerDisabled string = "disabled"
)
const (
FlowFanOut Flow = iota
FlowPipeline
FlowRouting
FlowFailOver
FlowLoadBalancing
)
type Flow int
var (
ctxKey = contextKey{}
)
var (
handlers = make(map[string]slog.Handler)
activeHandler string
level = new(slog.LevelVar)
formatters []slogformatter.Formatter
middlewares []slogmulti.Middleware
source bool
logger *slog.Logger
mux = &sync.Mutex{}
)
func ActiveHandler() string {
mux.Lock()
defer mux.Unlock()
return activeHandler
}
func key() contextKey {
return ctxKey
}
func Disable() {
UseHandler(handlerDisabled)
}
func FromContext(ctx context.Context) *slog.Logger {
if l, ok := ctx.Value(key()).(*slog.Logger); ok {
return l
}
return Logger()
}
func GetLevel() slog.Level {
return level.Level()
}
func GetLevelString() string {
return level.String()
}
func HandlerOptions() *slog.HandlerOptions {
return &slog.HandlerOptions{
AddSource: source,
Level: level,
ReplaceAttr: ReplaceAttrs,
}
}
func Init(l slog.Level, addSource bool) {
level.Set(l)
source = addSource
}
func Logger() *slog.Logger {
mux.Lock()
defer mux.Unlock()
if logger == nil {
logger = slog.New(handlers[handlerDisabled])
}
return logger
}
func SetLevel(l slog.Level) {
mux.Lock()
defer mux.Unlock()
level.Set(l)
}
func RegisterFormatter(f slogformatter.Formatter) {
mux.Lock()
defer mux.Unlock()
formatters = append(formatters, f)
}
func RegisterMiddleware(h slogmulti.Middleware) {
mux.Lock()
defer mux.Unlock()
middlewares = append(middlewares, h)
}
func RegisterSink(name string, h slog.Handler, activate bool) {
mux.Lock()
handlers[name] = h
mux.Unlock()
if activate {
UseHandler(name)
}
}
func UseHandler(name string) {
mux.Lock()
defer mux.Unlock()
if _, ok := handlers[name]; !ok {
Logger().LogAttrs(context.Background(), LevelError, "could not find handler", slog.String("name", name))
return
}
formatterPipe := slogformatter.NewFormatterMiddleware(formatters...)
pipe := slogmulti.Pipe(middlewares...).Pipe(formatterPipe)
handler := slogmulti.Fanout(handlers[name])
logger = slog.New(pipe.Handler(handler))
activeHandler = name
}
func WithContext(ctx context.Context) context.Context {
return context.WithValue(ctx, key(), Logger())
}
func init() {
// RegisterFormatter(LevelFormatter())
// RegisterMiddleware(NewLevelMiddleware())
registerDisabledHandler(true)
}
type contextKey struct{}

10
slogd/textHandler.go Normal file
View File

@ -0,0 +1,10 @@
package slogd
import (
"io"
"log/slog"
)
func RegisterTextHandler(w io.Writer, activate bool) {
RegisterSink(HandlerText, slog.NewTextHandler(w, HandlerOptions()), activate)
}