engine

package
v3.7.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 26, 2026 License: AGPL-3.0 Imports: 67 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	// ErrPlanningFailed is returned when query planning fails unexpectedly.
	// ErrPlanningFailed is not used for unimplemented features, which returns
	// [ErrNotSupported] instead.
	ErrPlanningFailed = errors.New("query planning failed unexpectedly")

	// ErrSchedulingFailed is returned when communication with the scheduler fails.
	ErrSchedulingFailed = errors.New("failed to schedule query")
)
View Source
var ErrNotSupported = errors.New("feature not supported in new query engine")

Functions

func EnableParanoidMode added in v3.7.0

func EnableParanoidMode()

EnableParanoidMode turns on runtime assertions for execution pipelines that will check important invariants on input and output records, such as column names uniqueness and labels uniqueness. This affects performance if enabled.

func Handler added in v3.7.0

func Handler(
	cfg Config,
	logger log.Logger,
	engine *Engine,
	limits Limits,
	reg prometheus.Registerer,
) (http.Handler, error)

Handler returns an http.Handler for serving queries. Unsupported queries will result in an error.

func HandlerFromExecutor added in v3.7.0

func HandlerFromExecutor(cfg Config, logger log.Logger, exec QueryExecutor, limits Limits, reg prometheus.Registerer) (http.Handler, error)

HandlerFromExecutor is like Handler but accepts any QueryExecutor. Useful for testing with wrapped or mock executors.

func IsQuerySupported

func IsQuerySupported(params logql.Params) bool

func NewCacheMiddleware added in v3.7.0

func NewCacheMiddleware(
	logger log.Logger,
	limits Limits,
	metricCache cache.Cache,
	instantMetricCache cache.Cache,
	logCache cache.Cache,
	reg prometheus.Registerer,
) (queryrangebase.Middleware, error)

NewCacheMiddleware returns a single middleware that routes requests to one of three separate cache backends: metric range queries, instant metric queries, and log (non-metric) range queries.

func NewInstantMetricCacheMiddleware added in v3.7.0

func NewInstantMetricCacheMiddleware(
	logger log.Logger,
	limits Limits,
	c cache.Cache,
	metrics *queryrangebase.ResultsCacheMetrics,
) (queryrangebase.Middleware, error)

NewInstantMetricCacheMiddleware creates an instant metric results cache middleware for the Thor engine. It caches instant metric (SampleExpr) queries.

func NewLogResultCache added in v3.7.0

func NewLogResultCache(
	logger log.Logger,
	limits logCacheLimits,
	c cache.Cache,
	metrics *queryrange.LogResultCacheMetrics,
) (queryrangebase.Middleware, error)

NewLogResultCache creates a log result cache middleware for the Thor engine. It only caches empty results for log range queries; metric queries pass through to the metric cache middleware sitting above this one in the chain.

func NewLogResultCacheMetrics added in v3.7.0

func NewLogResultCacheMetrics(registerer prometheus.Registerer) *queryrange.LogResultCacheMetrics

NewLogResultCacheMetrics creates metrics for the engine log result cache using engine-specific Prometheus metric names.

func NewMetricCacheMiddleware added in v3.7.0

func NewMetricCacheMiddleware(
	logger log.Logger,
	limits Limits,
	c cache.Cache,
	metrics *queryrangebase.ResultsCacheMetrics,
) (queryrangebase.Middleware, error)

NewMetricCacheMiddleware creates a metric results cache middleware for the Thor engine. It only caches metric (SampleExpr) queries; log queries pass through untouched.

func NewResultsCacheMetrics added in v3.7.0

func NewResultsCacheMetrics(reg prometheus.Registerer) *queryrangebase.ResultsCacheMetrics

NewResultsCacheMetrics creates metrics for the engine results cache middleware.

Types

type Basic added in v3.7.0

type Basic struct {
	// contains filtered or unexported fields
}

Basic is a basic LogQL evaluation engine. Evaluation is performed sequentially, with no local or distributed parallelism.

func NewBasic added in v3.7.0

func NewBasic(cfg ExecutorConfig, ms metastore.Metastore, bucket objstore.Bucket, limits logql.Limits, reg prometheus.Registerer, logger log.Logger) *Basic

NewBasic creates a new instance of the basic query engine that implements the logql.Engine interface. The basic engine executes plans sequentially with no local or distributed parallelism.

func (*Basic) Execute added in v3.7.0

func (e *Basic) Execute(ctx context.Context, params logql.Params) (logqlmodel.Result, error)

Execute executes a LogQL query and returns its results or alternatively an error. The execution is done in three steps:

  1. Create a logical plan from the provided query parameters.
  2. Create a physical plan from the logical plan using information from the catalog.
  3. Evaluate the physical plan with the executor.

func (*Basic) Query added in v3.7.0

func (e *Basic) Query(params logql.Params) logql.Query

Query implements logql.Engine.

type Config

type Config struct {
	// Enable the next generation Loki Query Engine for supported queries.
	Enable      bool `yaml:"enable" category:"experimental"`
	Distributed bool `yaml:"distributed" category:"experimental"`

	// InterfaceNames specifies the list of network interfaces to use for
	// accepting incoming traffic. The public address of the instance is
	// inferred from the interfaces in this list.
	InterfaceNames []string `yaml:"instance_interface_names" doc:"default=[<private network interfaces>]" category:"experimental"`

	Executor ExecutorConfig `yaml:",inline"`
	Worker   WorkerConfig   `yaml:",inline"`

	StorageLag           time.Duration `yaml:"storage_lag" category:"experimental"`
	StorageStartDate     flagext.Time  `yaml:"storage_start_date" category:"experimental"`
	StorageRetentionDays int64         `yaml:"storage_retention_days" category:"experimental"`

	EnableEngineRouter       bool   `yaml:"enable_engine_router" category:"experimental"`
	DownstreamAddress        string `yaml:"downstream_address" category:"experimental"`
	EnableDeleteReqFiltering bool   `yaml:"enable_delete_req_filtering" category:"experimental"`
	EnforceRetentionPeriod   bool   `yaml:"enforce_retention_period" category:"experimental"`

	// Mutate incoming queries to align their start and end with their step.
	AlignQueriesWithStep bool `yaml:"align_queries_with_step" category:"experimental"`

	// EnforceQuerySeriesLimit enables enforcement of the max_query_series limit.
	// When enabled, the tenant's MaxQuerySeries limit is applied; otherwise, no limit is enforced.
	EnforceQuerySeriesLimit bool `yaml:"enforce_max_query_series_limit" category:"experimental"`

	ResultsCache queryrangebase.ResultsCacheConfig `yaml:"results_cache" category:"experimental"`
}

Config holds the configuration options to use with the next generation Loki Query Engine.

func (*Config) AdvertiseAddr added in v3.7.0

func (cfg *Config) AdvertiseAddr(listenPort uint16) (*net.TCPAddr, error)

AdvertiseAddress determines the TCP address to advertise for accepting incoming traffic from workers. Returns nil, nil if distributed execution is not enabled.

The provided listenPort is used to construct the TCP address to advertise.

func (*Config) RegisterFlags added in v3.7.0

func (cfg *Config) RegisterFlags(f *flag.FlagSet)

func (*Config) RegisterFlagsWithPrefix

func (cfg *Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet)

func (*Config) ValidQueryRange

func (cfg *Config) ValidQueryRange() (time.Time, time.Time)

type Engine added in v3.7.0

type Engine struct {
	// contains filtered or unexported fields
}

Engine defines parameters for executing queries.

func New

func New(params Params) (*Engine, error)

New creates a new Engine.

func (*Engine) Execute added in v3.7.0

func (e *Engine) Execute(ctx context.Context, params logql.Params) (logqlmodel.Result, error)

Execute executes the given query. Execute returns ErrNotSupported if params denotes a query that is not yet implemented in the new engine.

type ExecutorConfig added in v3.7.0

type ExecutorConfig struct {
	// Batch size of the v2 execution engine.
	BatchSize int `yaml:"batch_size" category:"experimental"`

	// PrefetchBytes controls the number of bytes read ahead from a data object
	// when opening it.
	PrefetchBytes flagext.Bytes `yaml:"prefetch_bytes" category:"experimental"`

	// MergePrefetchCount controls the number of inputs that are prefetched simultaneously by any Merge node.
	MergePrefetchCount int `yaml:"merge_prefetch_count" category:"experimental"`

	// RangeConfig determines how to optimize range reads in the V2 engine.
	RangeConfig rangeio.Config `` /* 141-byte string literal not displayed */

	// StreamFilterer is an optional filterer that can filter streams based on their labels.
	// When set, streams are filtered before scanning.
	StreamFilterer executor.RequestStreamFilterer `yaml:"-"`
}

ExecutorConfig configures engine execution.

func (*ExecutorConfig) RegisterFlagsWithPrefix added in v3.7.0

func (cfg *ExecutorConfig) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet)

type InstantMetricCacheKeyGenerator added in v3.7.0

type InstantMetricCacheKeyGenerator struct {
	// contains filtered or unexported fields
}

InstantMetricCacheKeyGenerator generates cache keys for instant metric queries in the Thor (V2) query engine. It omits the step from the key because instant queries always have step=0.

func (InstantMetricCacheKeyGenerator) GenerateCacheKey added in v3.7.0

GenerateCacheKey generates a cache key based on userID, query, and time bucket.

type Limits added in v3.7.0

type Limits interface {
	querier_limits.Limits
	RetentionLimits

	MaxCacheFreshness(context.Context, string) time.Duration
	MaxQueryParallelism(context.Context, string) int
	EngineResultsCacheTimeBucketInterval(string) time.Duration
}

type LogCacheKeyGenerator added in v3.7.0

type LogCacheKeyGenerator struct {
	// contains filtered or unexported fields
}

LogCacheKeyGenerator implements queryrange.LogCacheKeyGenerator using EngineResultsCacheTimeBucketInterval instead of QuerySplitDuration.

func (*LogCacheKeyGenerator) GenerateCacheKey added in v3.7.0

func (g *LogCacheKeyGenerator) GenerateCacheKey(_ context.Context, tenantIDs []string, req *queryrange.LokiRequest) string

type MetricCacheKeyGenerator added in v3.7.0

type MetricCacheKeyGenerator struct {
	// contains filtered or unexported fields
}

MetricCacheKeyGenerator generates cache keys for the Thor (V2) query engine. It buckets keys by EngineResultsCacheTimeBucketInterval to allow cache sharing across queries that start within the same bucket.

func (MetricCacheKeyGenerator) GenerateCacheKey added in v3.7.0

func (s MetricCacheKeyGenerator) GenerateCacheKey(_ context.Context, userID string, r resultscache.Request) string

GenerateCacheKey generates a cache key based on the userID, query, step, and time bucket.

type Params added in v3.7.0

type Params struct {
	Logger     log.Logger            // Logger for optional log messages.
	Registerer prometheus.Registerer // Registerer for optional metrics.

	Config Config // Config for the Engine.

	Scheduler    *Scheduler          // Scheduler to manage the execution of tasks.
	Metastore    metastore.Metastore // Metastore to access the indexes
	Limits       logql.Limits        // Limits to apply to engine queries.
	DeleteGetter deletion.Getter     // DeleteGetter to fetch delete requests for query-time filtering.
}

Params holds parameters for constructing a new Engine.

type QueryExecutor added in v3.7.0

type QueryExecutor interface {
	Execute(ctx context.Context, params logql.Params) (logqlmodel.Result, error)
}

QueryExecutor is the interface satisfied by Engine, exposed for testing.

type RequestStreamFilterer added in v3.7.0

type RequestStreamFilterer = executor.RequestStreamFilterer

RequestStreamFilterer creates a StreamFilterer for a given request context.

type ResultBuilder

type ResultBuilder interface {
	CollectRecord(arrow.RecordBatch)
	Build(stats.Result, *metadata.Context) logqlmodel.Result
	Len() int
}

type RetentionCheckResult added in v3.7.0

type RetentionCheckResult struct {
	// If the query start was snapped to the retention boundary, this will
	// contain adjusted params. Otherwise, it returns the original params.
	Params logql.Params

	// EmptyResponse indicates the query should return an empty response
	// because the entire range is out of retention.
	EmptyResponse bool

	// Error is set if the query cannot be executed due to retention constraints.
	// This is used when stream retention makes the situation too complex.
	Error error
}

RetentionCheckResult represents the result of a retention check.

type RetentionLimits added in v3.7.0

type RetentionLimits interface {
	RetentionPeriod(userID string) time.Duration
	StreamRetention(userID string) []validation.StreamRetention
}

RetentionLimits provides access to tenant retention settings.

type Scheduler added in v3.7.0

type Scheduler struct {
	// contains filtered or unexported fields
}

Scheduler is a service that can schedule tasks to connected Worker instances.

func NewScheduler added in v3.7.0

func NewScheduler(params SchedulerParams) (*Scheduler, error)

NewScheduler creates a new Scheduler. Use Scheduler.Service to manage the lifecycle of the Scheduler.

func (*Scheduler) RegisterMetrics added in v3.7.0

func (s *Scheduler) RegisterMetrics(reg prometheus.Registerer) error

RegisterMetrics registers metrics about s to report to reg.

func (*Scheduler) RegisterSchedulerServer added in v3.7.0

func (s *Scheduler) RegisterSchedulerServer(router *mux.Router)

RegisterSchedulerServer registers the wire.Listener of the inner scheduler as http.Handler on the provided router.

RegisterSchedulerServer is a no-op if an advertise address is not provided.

func (*Scheduler) Service added in v3.7.0

func (s *Scheduler) Service() services.Service

Service returns the service used to manage the lifecycle of the Scheduler.

func (*Scheduler) UnregisterMetrics added in v3.7.0

func (s *Scheduler) UnregisterMetrics(reg prometheus.Registerer)

UnregisterMetrics unregisters metrics about s from reg.

type SchedulerParams added in v3.7.0

type SchedulerParams struct {
	Logger log.Logger // Logger for optional log messages.

	// Address to advertise to workers. Must be set when the scheduler runs in
	// remote transport mode.
	//
	// If nil, the scheduler only listens for in-process connections.
	AdvertiseAddr net.Addr

	// Absolute path of the endpoint where the frame handler is registered.
	// Used for connecting to scheduler and other workers.
	Endpoint string
}

type StreamFilterer added in v3.7.0

type StreamFilterer = executor.StreamFilterer

StreamFilterer filters streams based on their labels.

type Worker added in v3.7.0

type Worker struct {
	// contains filtered or unexported fields
}

Worker requests tasks from a Scheduler and executes them. Task results are sent to other Worker instances or back to the Scheduler.

func NewWorker added in v3.7.0

func NewWorker(params WorkerParams) (*Worker, error)

NewWorker creates a new Worker instance. Use Worker.Service to manage the lifecycle of the Worker.

func (*Worker) RegisterMetrics added in v3.7.0

func (w *Worker) RegisterMetrics(reg prometheus.Registerer) error

RegisterMetrics registers metrics about w to report to reg.

func (*Worker) RegisterWorkerServer added in v3.7.0

func (w *Worker) RegisterWorkerServer(router *mux.Router)

RegisterWorkerServer registers the wire.Listener of the inner worker as http.Handler on the provided router.

RegisterWorkerServer is a no-op if an advertise address is not provided.

func (*Worker) Service added in v3.7.0

func (w *Worker) Service() services.Service

Service returns the service used to manage the lifecycle of the Worker.

func (*Worker) UnregisterMetrics added in v3.7.0

func (w *Worker) UnregisterMetrics(reg prometheus.Registerer)

UnregisterMetrics unregisters metrics about w from reg.

type WorkerConfig added in v3.7.0

type WorkerConfig struct {
	WorkerThreads int `yaml:"worker_threads" category:"experimental"`

	SchedulerLookupAddress  string        `yaml:"scheduler_lookup_address" category:"experimental"`
	SchedulerLookupInterval time.Duration `yaml:"scheduler_lookup_interval" category:"experimental"`
}

WorkerConfig represents the configuration for the Worker.

func (*WorkerConfig) RegisterFlagsWithPrefix added in v3.7.0

func (cfg *WorkerConfig) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet)

type WorkerParams added in v3.7.0

type WorkerParams struct {
	Logger    log.Logger          // Logger for optional log messages.
	Bucket    objstore.Bucket     // Bucket to read stored data from.
	Metastore metastore.Metastore // Metastore to access indexes.

	Config   WorkerConfig   // Configuration for the worker.
	Executor ExecutorConfig // Configuration for task execution.

	// Local scheduler to connect to. If LocalScheduler is nil, the worker can
	// still connect to remote schedulers.
	LocalScheduler *Scheduler

	// Address to advertise to other workers and schedulers. Must be set when
	// the worker runs in remote transport mode.
	//
	// If nil, the worker only listens for in-process connections.
	AdvertiseAddr net.Addr

	// Absolute path of the endpoint where the frame handler is registered.
	// Used for connecting to scheduler and other workers.
	Endpoint string

	// StreamFilterer is an optional filterer that can filter streams based on their labels.
	// When set, streams are filtered before scanning.
	StreamFilterer executor.RequestStreamFilterer
}

WorkerParams holds parameters for constructing a new Worker.

Directories

Path Synopsis
internal
arrowagg
Package arrowagg provides utilities for aggregating Apache Arrow data structures.
Package arrowagg provides utilities for aggregating Apache Arrow data structures.
assertions
Assertions package provides a set of runtime assertions for extra safety.
Assertions package provides a set of runtime assertions for extra safety.
deletion
package deletion contains utilities for handling deletion requests in query engine.
package deletion contains utilities for handling deletion requests in query engine.
executor/matchutil
Package matchutil provides optimized string matching utilities for the query engine.
Package matchutil provides optimized string matching utilities for the query engine.
planner/logical
Package logical provides a logical query plan representation for data processing operations.
Package logical provides a logical query plan representation for data processing operations.
proto/ulid
Package ulid provides a ULID implementation.
Package ulid provides a ULID implementation.
scheduler
Package scheduler provides an implementation of workflow.Runner that works by scheduling tasks to be executed by a set of workers.
Package scheduler provides an implementation of workflow.Runner that works by scheduling tasks to be executed by a set of workers.
scheduler/wire
Package wire provides the wire protocol for how peers scheduler peers communicate.
Package wire provides the wire protocol for how peers scheduler peers communicate.
util/dag
Package dag provides utilities for working with directed acyclic graphs (DAGs).
Package dag provides utilities for working with directed acyclic graphs (DAGs).
util/ewma
Package ewma provides an implementation of an exponentially weighted moving average (EWMA) that can be reported as a Prometheus metric.
Package ewma provides an implementation of an exponentially weighted moving average (EWMA) that can be reported as a Prometheus metric.
util/objtest
Package objtest provides support for creating a data object storage directory for testing purposes.
Package objtest provides support for creating a data object storage directory for testing purposes.
util/queue/fair
Package fair implements a Hierarchical Fair Queue (HFQ), providing balanced service across a hierarchy of queues.
Package fair implements a Hierarchical Fair Queue (HFQ), providing balanced service across a hierarchy of queues.
worker
Package worker provides a mechanism to connect to the [scheduler] for executing tasks.
Package worker provides a mechanism to connect to the [scheduler] for executing tasks.
workflow
Package workflow defines how to represent physical plans as distributed workflows.
Package workflow defines how to represent physical plans as distributed workflows.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL