Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 33 additions & 23 deletions asyncsearcher/async_searcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,12 @@ type AsyncSearcherConfig struct {
MaxSizePerRequest int
}

func MustStartAsync(config AsyncSearcherConfig, mp MappingProvider, fracs fracmanager.List) *AsyncSearcher {
type fractionAcquirer interface {
Fractions() fracmanager.List
AcquireFraction(name string) (_ frac.Fraction, release func(), ok bool)
}

func MustStartAsync(config AsyncSearcherConfig, mp MappingProvider, fracs fractionAcquirer) *AsyncSearcher {
if config.DataDir == "" {
logger.Fatal("can't start async searcher: DataDir is empty")
}
Expand Down Expand Up @@ -154,10 +159,10 @@ type asyncSearchInfo struct {
infoSize *atomic.Int64
}

func newAsyncSearchInfo(r AsyncSearchRequest, list fracmanager.List) asyncSearchInfo {
fracsToSearch := make([]fracSearchState, 0, len(list))
for _, f := range list {
fracsToSearch = append(fracsToSearch, fracSearchState{Name: f.Info().Name()})
func newAsyncSearchInfo(r AsyncSearchRequest, fracNames []string) asyncSearchInfo {
fracsToSearch := make([]fracSearchState, 0, len(fracNames))
for _, name := range fracNames {
fracsToSearch = append(fracsToSearch, fracSearchState{Name: name})
}
ctx, cancel := context.WithCancel(context.Background())
return asyncSearchInfo{
Expand Down Expand Up @@ -204,7 +209,7 @@ func (i *asyncSearchInfo) Status() AsyncSearchStatus {
return status
}

func (as *AsyncSearcher) StartSearch(r AsyncSearchRequest, fracs fracmanager.List) error {
func (as *AsyncSearcher) StartSearch(r AsyncSearchRequest, fracs fractionAcquirer) error {
if as.readOnly.Load() {
return fmt.Errorf("cannot start search on read-only mode")
}
Expand Down Expand Up @@ -235,7 +240,8 @@ func (as *AsyncSearcher) StartSearch(r AsyncSearchRequest, fracs fracmanager.Lis
return fmt.Errorf("retention time should be less than %s, got %s", maxRetention, r.Retention)
}

if ok := as.saveSearchInfo(r, fracs); !ok {
fracNames := fracs.Fractions().FilterInRange(r.Params.From, r.Params.To).Names()
if ok := as.saveSearchInfo(r, fracNames); !ok {
// Request was saved previously, skip it
return nil
}
Expand All @@ -245,13 +251,13 @@ func (as *AsyncSearcher) StartSearch(r AsyncSearchRequest, fracs fracmanager.Lis
return nil
}

func (as *AsyncSearcher) saveSearchInfo(r AsyncSearchRequest, fracs fracmanager.List) bool {
func (as *AsyncSearcher) saveSearchInfo(r AsyncSearchRequest, fracNames []string) bool {
as.requestsMu.Lock()
defer as.requestsMu.Unlock()
if _, ok := as.requests[r.ID]; ok {
return false
}
info := newAsyncSearchInfo(r, fracs)
info := newAsyncSearchInfo(r, fracNames)
as.storeSearchInfoLocked(r.ID, info)
return true
}
Expand Down Expand Up @@ -295,7 +301,7 @@ func (as *AsyncSearcher) createDataDir() {
})
}

func (as *AsyncSearcher) processRequest(asyncSearchID string, fracs fracmanager.List) {
func (as *AsyncSearcher) processRequest(asyncSearchID string, fracs fractionAcquirer) {
defer as.processWg.Done()

as.rateLimit <- struct{}{}
Expand All @@ -305,7 +311,7 @@ func (as *AsyncSearcher) processRequest(asyncSearchID string, fracs fracmanager.
asyncSearchActiveSearches.Add(-1)
}

func (as *AsyncSearcher) doSearch(id string, fracs fracmanager.List) {
func (as *AsyncSearcher) doSearch(id string, fracs fractionAcquirer) {
qprPaths, err := as.findQPRs(id)
if err != nil {
panic(fmt.Errorf("can't find QPRs for id %q: %s", id, err))
Expand Down Expand Up @@ -340,25 +346,14 @@ func (as *AsyncSearcher) doSearch(id string, fracs fracmanager.List) {
info.Request.Params.AST = ast.Root
}

fracsByName := make(map[string]frac.Fraction)
for _, f := range fracs {
fracsByName[f.Info().Name()] = f
}

for _, fracInfo := range info.Fractions {
if _, ok := processedFracs[fracInfo.Name]; ok {
continue
}
if as.shouldStopSearch(id) {
break
}

f, ok := fracsByName[fracInfo.Name]
if !ok { // oldest fracs may already be removed
logger.Info("async search: skip missing fraction", zap.String("id", id), zap.String("frac", fracInfo.Name))
continue
}
if err := as.processFrac(f, info); err != nil {
if err := as.acquireAndProcessFrac(fracInfo, info, fracs); err != nil {
as.updateSearchInfo(id, func(info *asyncSearchInfo) {
info.Error = err.Error()
})
Expand Down Expand Up @@ -400,6 +395,21 @@ func compressQPR(qpr *seq.QPR, cb func(compressed []byte) error) error {
return nil
}

func (as *AsyncSearcher) acquireAndProcessFrac(fracInfo fracSearchState, searchInfo asyncSearchInfo, fracs fractionAcquirer) (err error) {
f, release, ok := fracs.AcquireFraction(fracInfo.Name)
defer release()
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.


if !ok { // oldest fracs may already be removed
logger.Info(
"async search: skip missing fraction",
zap.String("id", searchInfo.Request.ID),
zap.String("frac", fracInfo.Name),
)
return
}
return as.processFrac(f, searchInfo)
}

func (as *AsyncSearcher) processFrac(f frac.Fraction, info asyncSearchInfo) (err error) {
defer func() {
if panicData := util.RecoverToError(recover(), asyncSearchPanics); panicData != nil {
Expand Down
23 changes: 22 additions & 1 deletion asyncsearcher/async_searcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/ozontech/seq-db/frac"
"github.com/ozontech/seq-db/frac/common"
"github.com/ozontech/seq-db/frac/processor"
"github.com/ozontech/seq-db/fracmanager"
"github.com/ozontech/seq-db/mappingprovider"
"github.com/ozontech/seq-db/seq"
)
Expand All @@ -25,6 +26,10 @@ func (f *fakeFrac) Info() *common.Info {
return &f.info
}

func (f *fakeFrac) IsIntersecting(from, to seq.MID) bool {
return true
}

func (f *fakeFrac) Search(context.Context, processor.SearchParams) (*seq.QPR, error) {
return &f.dp.qpr, nil
}
Expand All @@ -33,6 +38,21 @@ type fakeDP struct {
qpr seq.QPR
}

type fakeFractionProvider fracmanager.List

func (fp fakeFractionProvider) AcquireFraction(name string) (frac.Fraction, func(), bool) {
for _, f := range fp {
if f.Info().Name() == name {
return f, func() {}, true
}
}
return nil, func() {}, false
}

func (fp fakeFractionProvider) Fractions() fracmanager.List {
return fracmanager.List(fp)
}

func TestAsyncSearcherMaintain(t *testing.T) {
r := require.New(t)

Expand All @@ -50,7 +70,8 @@ func TestAsyncSearcherMaintain(t *testing.T) {
Query: "*",
Retention: time.Hour,
}
fracs := []frac.Fraction{

fracs := fakeFractionProvider{
&fakeFrac{info: common.Info{Path: "1"}},
}
r.NoError(as.StartSearch(req, fracs))
Expand Down
4 changes: 4 additions & 0 deletions fracmanager/fracmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,10 @@ func New(ctx context.Context, cfg *Config, s3cli *s3.Client, skipMaskProvider sk
return &fm, stop, nil
}

func (fm *FracManager) AcquireFraction(name string) (frac.Fraction, func(), bool) {
return fm.lc.registry.AcquireFraction(name)
}

func (fm *FracManager) Fractions() List {
return fm.lc.registry.AllFractions()
}
Expand Down
31 changes: 26 additions & 5 deletions fracmanager/fraction_registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ type fractionRegistry struct {
muAll sync.RWMutex // protects active, all, and oldestTotal fields
active *activeProxy // currently active writable fraction
all []frac.Fraction // all fractions in creation order (read-only view)
allMap map[string]frac.Fraction
}

// NewFractionRegistry creates and initializes a new fraction registry instance.
Expand Down Expand Up @@ -82,6 +83,14 @@ func (r *fractionRegistry) Active() *activeProxy {
return r.active
}

func (r *fractionRegistry) AcquireFraction(name string) (frac.Fraction, func(), bool) {
r.muAll.RLock()
defer r.muAll.RUnlock()

f, ok := r.allMap[name]
return f, func() {}, ok
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why it always returns empty function? is it gonna be used later?

}

// AllFractions returns a read-only view of all fractions in creation order.
func (r *fractionRegistry) AllFractions() []frac.Fraction {
r.muAll.RLock()
Expand Down Expand Up @@ -215,6 +224,7 @@ func (r *fractionRegistry) addActive(a *activeProxy) {

r.active = a
r.all = append(r.all, a.proxy)
r.allMap[a.instance.Info().Name()] = a.proxy
}

// trimAll removes the oldest fractions from the complete fractions list.
Expand All @@ -223,6 +233,9 @@ func (r *fractionRegistry) trimAll(count int) {
r.muAll.Lock()
defer r.muAll.Unlock()

for _, f := range r.all[:count] {
delete(r.allMap, f.Info().Name())
}
r.all = r.all[count:]
r.updateOldestTotal()
}
Expand Down Expand Up @@ -439,26 +452,34 @@ func (r *fractionRegistry) removeFromOffloading(sealed *sealedProxy) {
// Expensive O(n) operation used when direct list modification is insufficient.
func (r *fractionRegistry) rebuildAllFractions() {
all := make([]frac.Fraction, 0, len(r.all))
allMap := make(map[string]frac.Fraction, len(r.all))

add := func(f frac.Fraction) {
all = append(all, f)
allMap[f.Info().Name()] = f
}

// collect fractions in correct chronological order: from oldest (remote) to newest (active)
for _, remote := range r.remotes {
all = append(all, remote.proxy)
add(remote.proxy)
}
for _, offloaded := range r.offloading {
all = append(all, offloaded.proxy)
add(offloaded.proxy)
}
for _, sealed := range r.sealed {
all = append(all, sealed.proxy)
add(sealed.proxy)
}
for _, active := range r.sealing {
all = append(all, active.proxy)
add(active.proxy)
}
all = append(all, r.active.proxy)

add(r.active.proxy)

r.muAll.Lock()
defer r.muAll.Unlock()

r.all = all
r.allMap = allMap
r.updateOldestTotal()
}

Expand Down
10 changes: 9 additions & 1 deletion fracmanager/list.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ func (l List) Sort(order seq.DocsOrder) {
}

func (l List) FilterInRange(from, to seq.MID) List {
res := make(List, 0)
res := make(List, 0, len(l))
for _, f := range l {
if f.IsIntersecting(from, to) {
res = append(res, f)
Expand All @@ -71,3 +71,11 @@ func (l *List) Shift(n int) []frac.Fraction {
*l = (*l)[n:]
return res
}

func (l List) Names() []string {
res := make([]string, 0, len(l))
for _, f := range l {
res = append(res, f.Info().Name())
}
return res
}
35 changes: 16 additions & 19 deletions skipmaskmanager/skip_mask_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,11 @@ type MappingProvider interface {
GetMapping() seq.Mapping
}

type fractionAcquirer interface {
Fractions() fracmanager.List
AcquireFraction(name string) (_ frac.Fraction, release func(), ok bool)
}

// Config holds configuration parameters for SkipMaskManager.
type Config struct {
DataDir string // Directory to store skip mask files
Expand Down Expand Up @@ -137,15 +142,15 @@ func New(
// - Begins asynchronous processing of all skip mask queries
//
// This method must be called before using the manager.
func (smm *SkipMaskManager) Start(fracs fracmanager.List) {
func (smm *SkipMaskManager) Start(fracs fractionAcquirer) {
smm.createDataDir()

err := smm.loadSkipMasks()
if err != nil {
logger.Fatal("failed to load previous skip masks", zap.Error(err))
}

err = smm.buildQueue(fracs)
err = smm.buildQueue(fracs.Fractions())
if err != nil {
logger.Fatal("failed to build skip mask manager queue", zap.Error(err))
}
Expand All @@ -166,7 +171,7 @@ func (smm *SkipMaskManager) Start(fracs fracmanager.List) {
}
sm.ast = ast

smm.processSkipMask(sm, fracs.FilterInRange(sm.params.From, sm.params.To))
smm.processSkipMask(sm, fracs)
}
}()
}
Expand Down Expand Up @@ -434,17 +439,7 @@ func (smm *SkipMaskManager) buildQueue(fracs fracmanager.List) error {
// It processes each fraction with a .queue file, running search queries in parallel
// (limited by the rate limiter). Each successful search writes results to a .skipmask
// file. The skip mask status is set to Done when all fractions are processed.
func (smm *SkipMaskManager) processSkipMask(skipMask *SkipMask, fracs fracmanager.List) {
if len(fracs) == 0 {
skipMask.setStatus(StatusDone)
return
}

fracsByName := make(map[string]frac.Fraction)
for _, f := range fracs {
fracsByName[f.Info().Name()] = f
}

func (smm *SkipMaskManager) processSkipMask(skipMask *SkipMask, fracs fractionAcquirer) {
skipMaskDes, err := os.ReadDir(skipMask.dirPath)
if err != nil {
panic(fmt.Errorf("BUG: reading directory must be successful: %s", err))
Expand All @@ -453,11 +448,6 @@ func (smm *SkipMaskManager) processSkipMask(skipMask *SkipMask, fracs fracmanage
inProgress.Add(1)

processFracInQueue := func(name string) error {
f, ok := fracsByName[fracNameFromFilePath(name)]
if !ok { // skip missing fracs
return nil
}

select {
case <-smm.ctx.Done():
return nil
Expand All @@ -466,6 +456,13 @@ func (smm *SkipMaskManager) processSkipMask(skipMask *SkipMask, fracs fracmanage
go func() {
defer skipMask.processWg.Done()
defer func() { <-smm.rateLimit }()

f, release, ok := fracs.AcquireFraction(fracNameFromFilePath(name))
defer release()
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For now, release() is noop, but it will change after merge of #277.
Are you sure that is good idea to release fraction even if it was not found?

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe defer release after if !ok check?

if !ok { // skip missing fracs
return
}

if err := smm.processFrac(f, skipMask); err != nil {
if errors.Is(err, context.Canceled) {
logger.Info("skip mask manager refresh frac context cancelled")
Expand Down
3 changes: 1 addition & 2 deletions storeapi/grpc_async_search.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,7 @@ func (g *GrpcV1) StartAsyncSearch(
Retention: r.Retention.AsDuration(),
WithDocs: r.WithDocs,
}
fracs := g.fracManager.Fractions().FilterInRange(seq.MillisToMID(uint64(r.From)), seq.MillisToMID(uint64(r.To)))
if err := g.asyncSearcher.StartSearch(req, fracs); err != nil {
if err := g.asyncSearcher.StartSearch(req, g.fracManager); err != nil {
return nil, err
}

Expand Down
Loading
Loading