Skip to content

Commit

Permalink
feat(artifact): implement fast indexing for temporary catalog
Browse files Browse the repository at this point in the history
  • Loading branch information
Yougigun committed Nov 30, 2024
1 parent 8f1c966 commit ac88c1b
Show file tree
Hide file tree
Showing 8 changed files with 1,230 additions and 414 deletions.
8 changes: 6 additions & 2 deletions cmd/main/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,10 +182,14 @@ func main() {
}),
)

// activate file-to-embeddings worker pool
wp := worker.NewFileToEmbWorkerPool(ctx, service, config.Config.FileToEmbeddingWorker.NumberOfWorkers)
// activate persistent catalog file-to-embeddings worker pool
wp := worker.NewPersistentCatalogFileToEmbWorkerPool(ctx, service, config.Config.FileToEmbeddingWorker.NumberOfWorkers, artifactPB.CatalogType_CATALOG_TYPE_PERSISTENT)
wp.Start()

// activate temp(ephemeral) catalog file-to-embeddings worker pool
wpTemp := worker.NewTempCatalogFileToEmbWorkerPool(ctx, service, config.Config.FileToEmbeddingWorker.NumberOfWorkers, artifactPB.CatalogType_CATALOG_TYPE_EPHEMERAL)
wpTemp.Start()

// Start usage reporter
var usg usage.Usage
if config.Config.Server.Usage.Enabled {
Expand Down
50 changes: 28 additions & 22 deletions pkg/handler/knowledgebase.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,11 +154,11 @@ func (ph *PublicHandler) CreateCatalog(ctx context.Context, req *artifactpb.Crea
service.NamespaceID + "/" + service.ConvertDocToMDPipelineID2 + "@" + service.DocToMDVersion2,
},
SplittingPipelines: []string{
service.NamespaceID + "/" + service.TextChunkPipelineID + "@" + service.TextSplitVersion,
service.NamespaceID + "/" + service.MdChunkPipelineID + "@" + service.MdSplitVersion,
service.NamespaceID + "/" + service.ChunkTextPipelineID + "@" + service.ChunkTextVersion,
service.NamespaceID + "/" + service.ChunkMdPipelineID + "@" + service.ChunkMdVersion,
},
EmbeddingPipelines: []string{
service.NamespaceID + "/" + service.TextEmbedPipelineID + "@" + service.TextEmbedVersion,
service.NamespaceID + "/" + service.EmbedTextPipelineID + "@" + service.EmbedTextVersion,
},
DownstreamApps: []string{},
TotalFiles: 0,
Expand Down Expand Up @@ -235,11 +235,11 @@ func (ph *PublicHandler) ListCatalogs(ctx context.Context, req *artifactpb.ListC
service.NamespaceID + "/" + service.ConvertDocToMDPipelineID2 + "@" + service.DocToMDVersion2,
},
SplittingPipelines: []string{
service.NamespaceID + "/" + service.TextChunkPipelineID + "@" + service.TextSplitVersion,
service.NamespaceID + "/" + service.MdChunkPipelineID + "@" + service.MdSplitVersion,
service.NamespaceID + "/" + service.ChunkTextPipelineID + "@" + service.ChunkTextVersion,
service.NamespaceID + "/" + service.ChunkMdPipelineID + "@" + service.ChunkMdVersion,
},
EmbeddingPipelines: []string{
service.NamespaceID + "/" + service.TextEmbedPipelineID + "@" + service.TextEmbedVersion,
service.NamespaceID + "/" + service.EmbedTextPipelineID + "@" + service.EmbedTextVersion,
},
DownstreamApps: []string{},
TotalFiles: uint32(fileCounts[kb.UID]),
Expand All @@ -262,7 +262,7 @@ func (ph *PublicHandler) UpdateCatalog(ctx context.Context, req *artifactpb.Upda
if req.CatalogId == "" {
log.Error("kb_id is empty", zap.Error(ErrCheckRequiredFields))
return nil, fmt.Errorf("kb_id is empty. err: %w", ErrCheckRequiredFields)
}
}

ns, err := ph.service.GetNamespaceByNsID(ctx, req.GetNamespaceId())
if err != nil {
Expand Down Expand Up @@ -318,22 +318,28 @@ func (ph *PublicHandler) UpdateCatalog(ctx context.Context, req *artifactpb.Upda
// populate response
return &artifactpb.UpdateCatalogResponse{
Catalog: &artifactpb.Catalog{
Name: kb.Name,
CatalogId: kb.KbID,
Description: kb.Description,
Tags: kb.Tags,
CreateTime: kb.CreateTime.String(),
UpdateTime: kb.UpdateTime.String(),
OwnerName: kb.Owner,
ConvertingPipelines: []string{service.NamespaceID + "/" + service.ConvertDocToMDPipelineID},
Name: kb.Name,
CatalogId: kb.KbID,
Description: kb.Description,
Tags: kb.Tags,
CreateTime: kb.CreateTime.String(),
UpdateTime: kb.UpdateTime.String(),
OwnerName: kb.Owner,
ConvertingPipelines: []string{
service.NamespaceID + "/" + service.ConvertDocToMDPipelineID,
service.NamespaceID + "/" + service.ConvertDocToMDPipelineID2,
},
SplittingPipelines: []string{
service.NamespaceID + "/" + service.TextChunkPipelineID,
service.NamespaceID + "/" + service.MdChunkPipelineID},
EmbeddingPipelines: []string{service.NamespaceID + "/" + service.TextEmbedPipelineID},
DownstreamApps: []string{},
TotalFiles: uint32(fileCounts[kb.UID]),
TotalTokens: uint32(tokenCounts[kb.UID]),
UsedStorage: uint64(kb.Usage),
service.NamespaceID + "/" + service.ChunkTextPipelineID,
service.NamespaceID + "/" + service.ChunkMdPipelineID,
},
EmbeddingPipelines: []string{
service.NamespaceID + "/" + service.EmbedTextPipelineID,
},
DownstreamApps: []string{},
TotalFiles: uint32(fileCounts[kb.UID]),
TotalTokens: uint32(tokenCounts[kb.UID]),
UsedStorage: uint64(kb.Usage),
},
}, nil
}
Expand Down
60 changes: 44 additions & 16 deletions pkg/mock/repository_i_mock.gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

64 changes: 50 additions & 14 deletions pkg/repository/knowledgebasefile.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ type KnowledgeBaseFileI interface {
// ProcessKnowledgeBaseFiles updates the process status of the files
ProcessKnowledgeBaseFiles(ctx context.Context, fileUIDs []string, requester uuid.UUID) ([]KnowledgeBaseFile, error)
// GetNeedProcessFiles returns the files that are not yet processed
GetNeedProcessFiles(ctx context.Context) []KnowledgeBaseFile
GetNeedProcessFiles(ctx context.Context, catalogType artifactpb.CatalogType) []KnowledgeBaseFile
// UpdateKnowledgeBaseFile updates the data and retrieves the latest data
UpdateKnowledgeBaseFile(ctx context.Context, fileUID string, updateMap map[string]interface{}) (*KnowledgeBaseFile, error)
// GetCountFilesByListKnowledgeBaseUID returns the number of files associated with the knowledge base UID
Expand Down Expand Up @@ -386,9 +386,11 @@ func (r *Repository) ProcessKnowledgeBaseFiles(
return files, nil
}

// GetNeedProcessFiles
func (r *Repository) GetNeedProcessFiles(ctx context.Context) []KnowledgeBaseFile {
// GetNeedProcessFiles returns the files that need to be processed in persistent catalogs
func (r *Repository) GetNeedProcessFiles(ctx context.Context, catalogType artifactpb.CatalogType) []KnowledgeBaseFile {
var files []KnowledgeBaseFile

// First get files that need processing
whereClause := fmt.Sprintf("%v IN ? AND %v is null", KnowledgeBaseFileColumn.ProcessStatus, KnowledgeBaseFileColumn.DeleteTime)
if err := r.db.WithContext(ctx).Where(
whereClause, []string{
Expand All @@ -400,7 +402,41 @@ func (r *Repository) GetNeedProcessFiles(ctx context.Context) []KnowledgeBaseFil
Find(&files).Error; err != nil {
return nil
}
return files

// Filter files to only include those from persistent catalogs
var result []KnowledgeBaseFile
// Get all unique knowledge base UIDs
kbUIDs := make([]uuid.UUID, 0)
kbUIDMap := make(map[uuid.UUID]bool)
for _, file := range files {
if !kbUIDMap[file.KnowledgeBaseUID] {
kbUIDs = append(kbUIDs, file.KnowledgeBaseUID)
kbUIDMap[file.KnowledgeBaseUID] = true
}
}

// Get all knowledge bases in one query
kbs, err := r.GetKnowledgeBasesByUIDs(ctx, kbUIDs)
if err != nil {
return nil
}

// Create map of persistent knowledge bases
persistentKBs := make(map[uuid.UUID]bool)
for _, kb := range kbs {
if kb.CatalogType == catalogType.String() {
persistentKBs[kb.UID] = true
}
}

// Filter files to only include those from persistent catalogs
for _, file := range files {
if persistentKBs[file.KnowledgeBaseUID] {
result = append(result, file)
}
}

return result
}

// UpdateKnowledgeBaseFile updates the data and retrieves the latest data
Expand Down Expand Up @@ -559,12 +595,12 @@ func (r *Repository) GetKnowledgebaseFileByKbUIDAndFileID(ctx context.Context, k
}

type SourceMeta struct {
OriginalFileUID uuid.UUID
OriginalFileUID uuid.UUID
OriginalFileName string
KbUID uuid.UUID
Dest string
CreateTime time.Time
UpdateTime time.Time
KbUID uuid.UUID
Dest string
CreateTime time.Time
UpdateTime time.Time
}

// GetTruthSourceByFileUID returns the truth source file destination of minIO by file UID
Expand Down Expand Up @@ -624,12 +660,12 @@ func (r *Repository) GetTruthSourceByFileUID(ctx context.Context, fileUID uuid.U
}

return &SourceMeta{
OriginalFileUID: originalFileUID,
OriginalFileUID: originalFileUID,
OriginalFileName: originalFileName,
Dest: dest,
CreateTime: createTime,
UpdateTime: updateTime,
KbUID: kbUID,
Dest: dest,
CreateTime: createTime,
UpdateTime: updateTime,
KbUID: kbUID,
}, nil
}

Expand Down
Loading

0 comments on commit ac88c1b

Please sign in to comment.