diff --git a/asyncsearcher/async_searcher.go b/asyncsearcher/async_searcher.go index eab092cf..538e066c 100644 --- a/asyncsearcher/async_searcher.go +++ b/asyncsearcher/async_searcher.go @@ -73,7 +73,12 @@ type AsyncSearcherConfig struct { MaxSizePerRequest int } -func MustStartAsync(config AsyncSearcherConfig, mp MappingProvider, fracs fracmanager.List) *AsyncSearcher { +type fractionAcquirer interface { + Fractions() fracmanager.List + AcquireFraction(name string) (_ frac.Fraction, release func(), ok bool) +} + +func MustStartAsync(config AsyncSearcherConfig, mp MappingProvider, fracs fractionAcquirer) *AsyncSearcher { if config.DataDir == "" { logger.Fatal("can't start async searcher: DataDir is empty") } @@ -154,10 +159,10 @@ type asyncSearchInfo struct { infoSize *atomic.Int64 } -func newAsyncSearchInfo(r AsyncSearchRequest, list fracmanager.List) asyncSearchInfo { - fracsToSearch := make([]fracSearchState, 0, len(list)) - for _, f := range list { - fracsToSearch = append(fracsToSearch, fracSearchState{Name: f.Info().Name()}) +func newAsyncSearchInfo(r AsyncSearchRequest, fracNames []string) asyncSearchInfo { + fracsToSearch := make([]fracSearchState, 0, len(fracNames)) + for _, name := range fracNames { + fracsToSearch = append(fracsToSearch, fracSearchState{Name: name}) } ctx, cancel := context.WithCancel(context.Background()) return asyncSearchInfo{ @@ -204,7 +209,7 @@ func (i *asyncSearchInfo) Status() AsyncSearchStatus { return status } -func (as *AsyncSearcher) StartSearch(r AsyncSearchRequest, fracs fracmanager.List) error { +func (as *AsyncSearcher) StartSearch(r AsyncSearchRequest, fracs fractionAcquirer) error { if as.readOnly.Load() { return fmt.Errorf("cannot start search on read-only mode") } @@ -235,7 +240,8 @@ func (as *AsyncSearcher) StartSearch(r AsyncSearchRequest, fracs fracmanager.Lis return fmt.Errorf("retention time should be less than %s, got %s", maxRetention, r.Retention) } - if ok := as.saveSearchInfo(r, fracs); !ok { + fracNames := fracs.Fractions().FilterInRange(r.Params.From, r.Params.To).Names() + if ok := as.saveSearchInfo(r, fracNames); !ok { // Request was saved previously, skip it return nil } @@ -245,13 +251,13 @@ func (as *AsyncSearcher) StartSearch(r AsyncSearchRequest, fracs fracmanager.Lis return nil } -func (as *AsyncSearcher) saveSearchInfo(r AsyncSearchRequest, fracs fracmanager.List) bool { +func (as *AsyncSearcher) saveSearchInfo(r AsyncSearchRequest, fracNames []string) bool { as.requestsMu.Lock() defer as.requestsMu.Unlock() if _, ok := as.requests[r.ID]; ok { return false } - info := newAsyncSearchInfo(r, fracs) + info := newAsyncSearchInfo(r, fracNames) as.storeSearchInfoLocked(r.ID, info) return true } @@ -295,7 +301,7 @@ func (as *AsyncSearcher) createDataDir() { }) } -func (as *AsyncSearcher) processRequest(asyncSearchID string, fracs fracmanager.List) { +func (as *AsyncSearcher) processRequest(asyncSearchID string, fracs fractionAcquirer) { defer as.processWg.Done() as.rateLimit <- struct{}{} @@ -305,7 +311,7 @@ func (as *AsyncSearcher) processRequest(asyncSearchID string, fracs fracmanager. asyncSearchActiveSearches.Add(-1) } -func (as *AsyncSearcher) doSearch(id string, fracs fracmanager.List) { +func (as *AsyncSearcher) doSearch(id string, fracs fractionAcquirer) { qprPaths, err := as.findQPRs(id) if err != nil { panic(fmt.Errorf("can't find QPRs for id %q: %s", id, err)) @@ -340,11 +346,6 @@ func (as *AsyncSearcher) doSearch(id string, fracs fracmanager.List) { info.Request.Params.AST = ast.Root } - fracsByName := make(map[string]frac.Fraction) - for _, f := range fracs { - fracsByName[f.Info().Name()] = f - } - for _, fracInfo := range info.Fractions { if _, ok := processedFracs[fracInfo.Name]; ok { continue @@ -352,13 +353,7 @@ func (as *AsyncSearcher) doSearch(id string, fracs fracmanager.List) { if as.shouldStopSearch(id) { break } - - f, ok := fracsByName[fracInfo.Name] - if !ok { // oldest fracs may already be removed - logger.Info("async search: skip missing fraction", zap.String("id", id), zap.String("frac", fracInfo.Name)) - continue - } - if err := as.processFrac(f, info); err != nil { + if err := as.acquireAndProcessFrac(fracInfo, info, fracs); err != nil { as.updateSearchInfo(id, func(info *asyncSearchInfo) { info.Error = err.Error() }) @@ -400,6 +395,21 @@ func compressQPR(qpr *seq.QPR, cb func(compressed []byte) error) error { return nil } +func (as *AsyncSearcher) acquireAndProcessFrac(fracInfo fracSearchState, searchInfo asyncSearchInfo, fracs fractionAcquirer) (err error) { + f, release, ok := fracs.AcquireFraction(fracInfo.Name) + defer release() + + if !ok { // oldest fracs may already be removed + logger.Info( + "async search: skip missing fraction", + zap.String("id", searchInfo.Request.ID), + zap.String("frac", fracInfo.Name), + ) + return + } + return as.processFrac(f, searchInfo) +} + func (as *AsyncSearcher) processFrac(f frac.Fraction, info asyncSearchInfo) (err error) { defer func() { if panicData := util.RecoverToError(recover(), asyncSearchPanics); panicData != nil { diff --git a/asyncsearcher/async_searcher_test.go b/asyncsearcher/async_searcher_test.go index f403aa44..8ef37bc3 100644 --- a/asyncsearcher/async_searcher_test.go +++ b/asyncsearcher/async_searcher_test.go @@ -11,6 +11,7 @@ import ( "github.com/ozontech/seq-db/frac" "github.com/ozontech/seq-db/frac/common" "github.com/ozontech/seq-db/frac/processor" + "github.com/ozontech/seq-db/fracmanager" "github.com/ozontech/seq-db/mappingprovider" "github.com/ozontech/seq-db/seq" ) @@ -25,6 +26,10 @@ func (f *fakeFrac) Info() *common.Info { return &f.info } +func (f *fakeFrac) IsIntersecting(from, to seq.MID) bool { + return true +} + func (f *fakeFrac) Search(context.Context, processor.SearchParams) (*seq.QPR, error) { return &f.dp.qpr, nil } @@ -33,6 +38,21 @@ type fakeDP struct { qpr seq.QPR } +type fakeFractionProvider fracmanager.List + +func (fp fakeFractionProvider) AcquireFraction(name string) (frac.Fraction, func(), bool) { + for _, f := range fp { + if f.Info().Name() == name { + return f, func() {}, true + } + } + return nil, func() {}, false +} + +func (fp fakeFractionProvider) Fractions() fracmanager.List { + return fracmanager.List(fp) +} + func TestAsyncSearcherMaintain(t *testing.T) { r := require.New(t) @@ -50,7 +70,8 @@ func TestAsyncSearcherMaintain(t *testing.T) { Query: "*", Retention: time.Hour, } - fracs := []frac.Fraction{ + + fracs := fakeFractionProvider{ &fakeFrac{info: common.Info{Path: "1"}}, } r.NoError(as.StartSearch(req, fracs)) diff --git a/fracmanager/fracmanager.go b/fracmanager/fracmanager.go index b35066a9..6b5b7c87 100644 --- a/fracmanager/fracmanager.go +++ b/fracmanager/fracmanager.go @@ -95,6 +95,10 @@ func New(ctx context.Context, cfg *Config, s3cli *s3.Client, skipMaskProvider sk return &fm, stop, nil } +func (fm *FracManager) AcquireFraction(name string) (frac.Fraction, func(), bool) { + return fm.lc.registry.AcquireFraction(name) +} + func (fm *FracManager) Fractions() List { return fm.lc.registry.AllFractions() } diff --git a/fracmanager/fraction_registry.go b/fracmanager/fraction_registry.go index c8a12eef..0f79c28d 100644 --- a/fracmanager/fraction_registry.go +++ b/fracmanager/fraction_registry.go @@ -34,6 +34,7 @@ type fractionRegistry struct { muAll sync.RWMutex // protects active, all, and oldestTotal fields active *activeProxy // currently active writable fraction all []frac.Fraction // all fractions in creation order (read-only view) + allMap map[string]frac.Fraction } // NewFractionRegistry creates and initializes a new fraction registry instance. @@ -82,6 +83,14 @@ func (r *fractionRegistry) Active() *activeProxy { return r.active } +func (r *fractionRegistry) AcquireFraction(name string) (frac.Fraction, func(), bool) { + r.muAll.RLock() + defer r.muAll.RUnlock() + + f, ok := r.allMap[name] + return f, func() {}, ok +} + // AllFractions returns a read-only view of all fractions in creation order. func (r *fractionRegistry) AllFractions() []frac.Fraction { r.muAll.RLock() @@ -215,6 +224,7 @@ func (r *fractionRegistry) addActive(a *activeProxy) { r.active = a r.all = append(r.all, a.proxy) + r.allMap[a.instance.Info().Name()] = a.proxy } // trimAll removes the oldest fractions from the complete fractions list. @@ -223,6 +233,9 @@ func (r *fractionRegistry) trimAll(count int) { r.muAll.Lock() defer r.muAll.Unlock() + for _, f := range r.all[:count] { + delete(r.allMap, f.Info().Name()) + } r.all = r.all[count:] r.updateOldestTotal() } @@ -439,26 +452,34 @@ func (r *fractionRegistry) removeFromOffloading(sealed *sealedProxy) { // Expensive O(n) operation used when direct list modification is insufficient. func (r *fractionRegistry) rebuildAllFractions() { all := make([]frac.Fraction, 0, len(r.all)) + allMap := make(map[string]frac.Fraction, len(r.all)) + + add := func(f frac.Fraction) { + all = append(all, f) + allMap[f.Info().Name()] = f + } // collect fractions in correct chronological order: from oldest (remote) to newest (active) for _, remote := range r.remotes { - all = append(all, remote.proxy) + add(remote.proxy) } for _, offloaded := range r.offloading { - all = append(all, offloaded.proxy) + add(offloaded.proxy) } for _, sealed := range r.sealed { - all = append(all, sealed.proxy) + add(sealed.proxy) } for _, active := range r.sealing { - all = append(all, active.proxy) + add(active.proxy) } - all = append(all, r.active.proxy) + + add(r.active.proxy) r.muAll.Lock() defer r.muAll.Unlock() r.all = all + r.allMap = allMap r.updateOldestTotal() } diff --git a/fracmanager/list.go b/fracmanager/list.go index 078ed02d..dba6c37b 100644 --- a/fracmanager/list.go +++ b/fracmanager/list.go @@ -56,7 +56,7 @@ func (l List) Sort(order seq.DocsOrder) { } func (l List) FilterInRange(from, to seq.MID) List { - res := make(List, 0) + res := make(List, 0, len(l)) for _, f := range l { if f.IsIntersecting(from, to) { res = append(res, f) @@ -71,3 +71,11 @@ func (l *List) Shift(n int) []frac.Fraction { *l = (*l)[n:] return res } + +func (l List) Names() []string { + res := make([]string, 0, len(l)) + for _, f := range l { + res = append(res, f.Info().Name()) + } + return res +} diff --git a/skipmaskmanager/skip_mask_manager.go b/skipmaskmanager/skip_mask_manager.go index 055cca33..47522a91 100644 --- a/skipmaskmanager/skip_mask_manager.go +++ b/skipmaskmanager/skip_mask_manager.go @@ -44,6 +44,11 @@ type MappingProvider interface { GetMapping() seq.Mapping } +type fractionAcquirer interface { + Fractions() fracmanager.List + AcquireFraction(name string) (_ frac.Fraction, release func(), ok bool) +} + // Config holds configuration parameters for SkipMaskManager. type Config struct { DataDir string // Directory to store skip mask files @@ -137,7 +142,7 @@ func New( // - Begins asynchronous processing of all skip mask queries // // This method must be called before using the manager. -func (smm *SkipMaskManager) Start(fracs fracmanager.List) { +func (smm *SkipMaskManager) Start(fracs fractionAcquirer) { smm.createDataDir() err := smm.loadSkipMasks() @@ -145,7 +150,7 @@ func (smm *SkipMaskManager) Start(fracs fracmanager.List) { logger.Fatal("failed to load previous skip masks", zap.Error(err)) } - err = smm.buildQueue(fracs) + err = smm.buildQueue(fracs.Fractions()) if err != nil { logger.Fatal("failed to build skip mask manager queue", zap.Error(err)) } @@ -166,7 +171,7 @@ func (smm *SkipMaskManager) Start(fracs fracmanager.List) { } sm.ast = ast - smm.processSkipMask(sm, fracs.FilterInRange(sm.params.From, sm.params.To)) + smm.processSkipMask(sm, fracs) } }() } @@ -434,17 +439,7 @@ func (smm *SkipMaskManager) buildQueue(fracs fracmanager.List) error { // It processes each fraction with a .queue file, running search queries in parallel // (limited by the rate limiter). Each successful search writes results to a .skipmask // file. The skip mask status is set to Done when all fractions are processed. -func (smm *SkipMaskManager) processSkipMask(skipMask *SkipMask, fracs fracmanager.List) { - if len(fracs) == 0 { - skipMask.setStatus(StatusDone) - return - } - - fracsByName := make(map[string]frac.Fraction) - for _, f := range fracs { - fracsByName[f.Info().Name()] = f - } - +func (smm *SkipMaskManager) processSkipMask(skipMask *SkipMask, fracs fractionAcquirer) { skipMaskDes, err := os.ReadDir(skipMask.dirPath) if err != nil { panic(fmt.Errorf("BUG: reading directory must be successful: %s", err)) @@ -453,11 +448,6 @@ func (smm *SkipMaskManager) processSkipMask(skipMask *SkipMask, fracs fracmanage inProgress.Add(1) processFracInQueue := func(name string) error { - f, ok := fracsByName[fracNameFromFilePath(name)] - if !ok { // skip missing fracs - return nil - } - select { case <-smm.ctx.Done(): return nil @@ -466,6 +456,13 @@ func (smm *SkipMaskManager) processSkipMask(skipMask *SkipMask, fracs fracmanage go func() { defer skipMask.processWg.Done() defer func() { <-smm.rateLimit }() + + f, release, ok := fracs.AcquireFraction(fracNameFromFilePath(name)) + defer release() + if !ok { // skip missing fracs + return + } + if err := smm.processFrac(f, skipMask); err != nil { if errors.Is(err, context.Canceled) { logger.Info("skip mask manager refresh frac context cancelled") diff --git a/storeapi/grpc_async_search.go b/storeapi/grpc_async_search.go index 518ed19f..6f0310d4 100644 --- a/storeapi/grpc_async_search.go +++ b/storeapi/grpc_async_search.go @@ -46,8 +46,7 @@ func (g *GrpcV1) StartAsyncSearch( Retention: r.Retention.AsDuration(), WithDocs: r.WithDocs, } - fracs := g.fracManager.Fractions().FilterInRange(seq.MillisToMID(uint64(r.From)), seq.MillisToMID(uint64(r.To))) - if err := g.asyncSearcher.StartSearch(req, fracs); err != nil { + if err := g.asyncSearcher.StartSearch(req, g.fracManager); err != nil { return nil, err } diff --git a/storeapi/grpc_v1.go b/storeapi/grpc_v1.go index b9246998..3471b3f0 100644 --- a/storeapi/grpc_v1.go +++ b/storeapi/grpc_v1.go @@ -114,10 +114,7 @@ func NewGrpcV1(cfg APIConfig, fracManager *fracmanager.FracManager, mappingProvi fetchData: fetchData{ docFetcher: fracmanager.NewFetcher(config.FetchWorkers), }, - asyncSearcher: asyncsearcher.MustStartAsync( - cfg.Search.Async, mappingProvider, - fracManager.Fractions(), - ), + asyncSearcher: asyncsearcher.MustStartAsync(cfg.Search.Async, mappingProvider, fracManager), } return g diff --git a/storeapi/store.go b/storeapi/store.go index 5ea0c5a7..dd53079e 100644 --- a/storeapi/store.go +++ b/storeapi/store.go @@ -72,7 +72,7 @@ func NewStore( return nil, fmt.Errorf("loading fractions error: %w", err) } - skipMaskManager.Start(fracManager.Fractions()) + skipMaskManager.Start(fracManager) return &Store{ Config: c,