Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Feature]: Support for thread number limits in queries and maximum memory footprint metrics for indexing and querying #38013

Open
1 task done
lmccccc opened this issue Nov 26, 2024 · 5 comments
Assignees
Labels
kind/feature Issues related to feature request from users

Comments

@lmccccc
Copy link

lmccccc commented Nov 26, 2024

Is there an existing issue for this?

  • I have searched the existing issues

Is your feature request related to a problem? Please describe.

I would like to request two features that would enhance the control and observability of Milvus 2.4 during query and indexing operations:

Ability to Limit Thread Number in Queries:
Provide a configuration option or API to control the maximum number of threads utilized during query execution.

Expose Maximum Memory Footprint Metrics:
Introduce a way to fetch memory usage metrics during both indexing and querying, like:
Peak memory usage during index construction.

Describe the solution you'd like.

No response

Describe an alternate solution.

No response

Anything else? (Additional Context)

If such features is already available, how do I use it? It seems Milvus distributed can use monitor to track its usage, but I would like to estimate its efficiency in the Standalone or Lite version.

@lmccccc lmccccc added the kind/feature Issues related to feature request from users label Nov 26, 2024
@xiaofan-luan
Copy link
Collaborator

Is there an existing issue for this?

  • I have searched the existing issues

Is your feature request related to a problem? Please describe.

I would like to request two features that would enhance the control and observability of Milvus 2.4 during query and indexing operations:

Ability to Limit Thread Number in Queries: Provide a configuration option or API to control the maximum number of threads utilized during query execution.

Expose Maximum Memory Footprint Metrics: Introduce a way to fetch memory usage metrics during both indexing and querying, like: Peak memory usage during index construction.

Describe the solution you'd like.

No response

Describe an alternate solution.

No response

Anything else? (Additional Context)

If such features is already available, how do I use it? It seems Milvus distributed can use monitor to track its usage, but I would like to estimate its efficiency in the Standalone or Lite version.

We already use a threadpool to limit the thread to execute. Did you find any thread leakage? can you explian your test env and how did you do the test?

@lmccccc
Copy link
Author

lmccccc commented Nov 26, 2024

Is there an existing issue for this?

  • I have searched the existing issues

Is your feature request related to a problem? Please describe.

I would like to request two features that would enhance the control and observability of Milvus 2.4 during query and indexing operations:
Ability to Limit Thread Number in Queries: Provide a configuration option or API to control the maximum number of threads utilized during query execution.
Expose Maximum Memory Footprint Metrics: Introduce a way to fetch memory usage metrics during both indexing and querying, like: Peak memory usage during index construction.

Describe the solution you'd like.

No response

Describe an alternate solution.

No response

Anything else? (Additional Context)

If such features is already available, how do I use it? It seems Milvus distributed can use monitor to track its usage, but I would like to estimate its efficiency in the Standalone or Lite version.

We already use a threadpool to limit the thread to execute. Did you find any thread leakage? can you explian your test env and how did you do the test?

I am conducting an experiment to estimate QPS using a single thread. Additionally, I would like to monitor the memory footprint during index construction. My question is about how to configure the thread number for queries and how to retrieve memory statistics for indexing.

There is my code:

import sys
from pymilvus import DataType, MilvusClient
import sys
from defination import check_dir, check_file, ivecs_read, fvecs_read, bvecs_read, read_attr, read_file
import numpy as np
import math
import time
import math

# read args
if __name__ == "__main__":
    # ${dataset} 
    # ${N} 
    # ${query_size} 
    # ${dataset_file} 
    # ${query_file} 
    # ${dataset_attr_file} 
    # ${query_range_file} 
    # ${K}
    if not len(sys.argv) == 14 :
        print("error wrong argument size")
        exit()
    else:
        dataset = sys.argv[1]
        print("dataset:", dataset)

        N = int(sys.argv[2])
        print("N:", N)
        
        dataset_file = sys.argv[3]
        print("dataset file:", dataset_file)
        check_file(dataset_file)

        query_file = sys.argv[4]
        print("query file:", query_file)
        check_file(query_file)

        dataset_attr_file = sys.argv[5]
        print("attr file:", dataset_attr_file)
        check_file(dataset_attr_file)

        query_range_file = sys.argv[6]
        print("query range file:", query_range_file)
        check_file(query_range_file)

        groundtruth_file = sys.argv[7]
        print("ground truth file:", groundtruth_file)
        check_file(groundtruth_file)

        K = int(sys.argv[8])
        print("K:", K)
        
        nprobe = int(sys.argv[9])
        print("nprobe:", nprobe)

        c_name = sys.argv[10]
        print("collection name:", c_name)

        mode = sys.argv[11]

        M = int(sys.argv[12])

        d = int(sys.argv[13])


    client = MilvusClient(
        uri="http://localhost:19530"
    )

    # create collection
    # c_name = "milvus_" + dataset + "_collection_" + label_attr
    # if client.has_collection(c_name) and mode != "construction":
    #     print("collection ", c_name, " exists")

    #     client.load_collection(collection_name=c_name, 
    #                            replica_number=1,
    #                            load_fields=["id", "vector", "label"])
    #     res = client.query(
    #         collection_name=c_name,
    #         output_fields=["count(*)"]
    #     )

    #     print("collection size:", res)

        
        
    #     # client.drop_collection(c_name)
    # else:
    if True:
        if client.has_collection(c_name): # construction mode
            print("collection ", c_name, " exists, then drop it")
            client.drop_collection(collection_name=c_name)

        # load data
        dataset = read_file(dataset_file)
        N, _d = dataset.shape
        assert(_d == d)
        print("get dataset size:", N, " d:", _d)
        print("dim:", d)

        attr = read_attr(dataset_attr_file)
        _N = len(attr)
        print("_N:", _N, " N:", N)
        assert(_N == N)

        label_set = set(attr)
        label_cnt = len(label_set)
        partition_size = min(64, label_cnt)

        # create collection
        schema = MilvusClient.create_schema(
            auto_id=False,
            partition_key_field="label",      # default partition=64
            num_partitions=partition_size
        )
        schema.add_field(field_name="id", datatype=DataType.INT64, is_primary=True)
        schema.add_field(field_name="vector", datatype=DataType.FLOAT_VECTOR, dim=d)
        schema.add_field(field_name="label", datatype=DataType.INT64)
        
        index_params = client.prepare_index_params()
        # index_params.add_index(
        #     field_name="label",
        #     index_type="STL_SORT"
        # )
        nlist = int(math.sqrt(N))
        print("partition M:", M)
        print("nlist:", nlist)
        index_params.add_index(
            field_name="vector", 
            index_type="IVF_PQ",# IVF_FLAT IVF_PQ IVF_SQ8 HNSW SCANN
            metric_type="L2",
            params={ "nlist": nlist, "m": M} # see https://milvus.io/docs/configure_querynode.md#queryNodesegcoreinterimIndexnlist
        )

        client.create_collection(collection_name=c_name, 
                                    schema=schema,
                                    index_params=index_params,
                                    enable_dynamic_field=True)


        
        data = [
            {"id": i, "vector": dataset[i].tolist(), "label": attr[i]}
            for i in range(N)
        ]

        # insert data too large in a batch will cause timeout
        print("start insertion")
        # set start time
        t0 = time.time()
        batch_size = 100000
        for i in range(0, N, batch_size):
            client.insert(collection_name=c_name, data=data[i:i+batch_size])
            print("insert batch:", i/batch_size)
        # client.insert(collection_name=c_name, data=data)
        # print("insert res:", res)
        t1 = time.time()
        print("insert suc, time cost:", t1-t0)
        
        if mode == "construction":
            print("construction done")
            # exit()


    query = read_file(query_file)
    Nq, _d = query.shape
    print("query d:", _d)
    print("get query szie:", Nq)  
    
    qrange = read_attr(query_range_file)
    qrange = qrange.reshape(-1, 2)
    _Nq = len(qrange)
    print("get range cnt:", _Nq)
    assert(_Nq == Nq)

    gt = read_attr(groundtruth_file)
    gt = gt.reshape(-1, K)
    Ngt = gt.shape[0]
    
    print("ngt=", Ngt, " nq=", Nq, " k=", K)
    assert(Ngt == Nq)

    ids = []
    q_t = 0
    s_params = {"metric_type": "L2", "params": {"nprobe": nprobe}}
    hist = np.array([0 for _ in range(11)], dtype='float')
    total_size = Nq * K
    positive_size = 0
    for i in range(Nq):
        # if(i % 100 == 0):
        #     print("batch ", i)
        q_cnt = 0
        exp = str(qrange[i][0]) + " <= label <= " + str(qrange[i][1])
        # qr = [i for i in range(qrange[i][0], qrange[i][1] + 1)]
        # exp = "label in " + str(qr)
        t0 = time.time()
        res = client.search(
                            collection_name=c_name,
                            data=[query[i].tolist()], 
                            filter=exp,
                            limit=K,
                            search_params=s_params, 
                            group_strict_size=True,
                            )
        t1 = time.time()
        q_t += t1 - t0
        res_id = [x['id'] for x in res[0]]
        ids.append(res_id)
        # if(len(res_id) != K):
        #     print("error, get ", len(res_id), " results, expect ", K)
        #     print("res:", res)
        #     print("exp:", exp)

        # for top in range(K):
        #     for gtind in range(K):
        #         if(ids[i][top] == gt[i][gtind]):
        #             positive_size += 1
        #             q_cnt += 1
        #             break
        # q_recall = q_cnt * 1.0 / K
        # q_bucket = math.floor(q_recall * 10)
        # hist[q_bucket] += 1
        # if i % 100 == 0:
        #     print( "temp recall:", positive_size * 1.0 / ((i+1)* K))
        #     _hist = hist / (i + 1)
        #     print("temp recall dist(10\% per bucket): ", _hist)

    hist = np.array([0 for _ in range(11)], dtype='float')
    total_size = Nq * K
    positive_size = 0
    client.drop_collection(collection_name=c_name)
    print("ids shape:", len(ids), len(ids[0]))
    print("gt shape:", gt.shape)
    for qind in range(Nq):
        q_cnt = 0
        for top in range(K):
            for gtind in range(K):
                if(ids[qind][top] == gt[qind][gtind]):
                    positive_size += 1
                    q_cnt += 1
                    break
        q_recall = q_cnt * 1.0 / K
        q_bucket = math.floor(q_recall * 10)
        hist[q_bucket] += 1

    recall = positive_size * 1.0 / total_size
    print("milvus get ", positive_size, " postive res from ", total_size, " results, recall@", K, ":", recall)
    hist = hist / Nq
    print("recall dist(10\% per bucket): ", hist)
    print(Nq, " queries cost:", q_t, ", qps:" , Nq / q_t)

I am running Milvus 2.4 Standalone with the server and query client on the same machine (Ubuntu 20 with 128 cores and 2TB RAM). I would like to know how to:

  1. Retrieve the memory footprint during index construction.
  2. Limit the query thread count in Milvus.
    There is no thread leak occurring in my experiment.

Additionally, I am facing an issue where the recall rate decreases as the query progresses, even though I am using the same query and experimental setup throughout.

@xiaofan-luan
Copy link
Collaborator

the reason is your are using PQ index.
Once index is done, it offers lower recall than growing segment.
if you want better recall, HNSW is usually recommended.

There is no way to seperate index memory usage and data load memory usage, unless you use seperate indexnode and querynode

@xiaofan-luan
Copy link
Collaborator

Milvus has limited thread on query so ideally you don't need to care about it. The default thread number is linear to cores on your machine.

@lmccccc
Copy link
Author

lmccccc commented Nov 26, 2024

Milvus has limited thread on query so ideally you don't need to care about it. The default thread number is linear to cores on your machine.

Thank you for your response.

If there is any method to configure the thread number for query execution or other related operations, I would be grateful if you could share that with me. It would be extremely helpful for my experiment.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
kind/feature Issues related to feature request from users
Projects
None yet
Development

No branches or pull requests

2 participants