向量检索ANN(Approximate Nearest Neighbor Search),指的是对于一个query向量,从向量库中找到和它距离最接近的k个向量。这是一个典型的topk任务,这里要的不是“精确topk”而是“近似topk”,更多考虑的是 精度和计算时间之间的权衡,牺牲一部分精度来降低计算时间。

Faiss的全称是Facebook AI Similarity Search,是FaceBook的AI团队针对大规模相似度检索问题开发的一个工具,使用C++编写,有python接口,对10亿量级的索引可以做到毫秒级检索的性能



  • 实时查询,支持海量(百亿、千亿级别)规模库量级的;
  • 存储高效,要求构建的向量索引模型数据压缩比高,达到大幅缩减内存使占用的目的;
  • 召回精度好,topK有比较好的召回率,跟暴力搜索的结果相比;


git clone https://github.com/facebookresearch/faiss.git
git reset --hard dafdff110489db7587b169a0afee8470f220d295  # 我用的是这个版本

# 如果cmake版本太低,这里会提示需要升级cmake

# 升级cmake 
# 下载 
wget https://github.com/Kitware/CMake/releases/download/v3.23.0/cmake-3.23.0-linux-x86_64.sh

# 安装,安装好cd到对应的目录下查看是否有cmake的bin文件,配置环境变量
sudo bash ./cmake-3.23.0-linux-x86_64.sh --skip-licence --prefix=/usr
  • 编译&运行demo


CC = g++
CFLAGS = -g -Wall -I/home/your_path/faiss
LDFLAGS = -L/home/your_path/faiss/faiss/build/faiss -lfaiss
RPATH = -Wl,-rpath=/home/your_path/faiss/faiss/build/faiss

all: flat ivfflat hnsw ivfpq

flat: flat.cc
 $(CC) $(CFLAGS) -o flat flat.cc $(LDFLAGS) $(RPATH)


#include <faiss/IndexFlat.h>
#include <faiss/index_io.h>
#include <iostream>

int main() {
    std::cout << "hello faiss" << std::endl;
    // Define the dimension of the vectors
    int d = 4;

    // Define the number of vectors to index
    int nb = 100000;  // 10w

    // Generate some random data to index
    float *xb = new float[d * nb];   // 64 * 10w 
    for(int i = 0; i < nb; i++) {
        for(int j = 0; j < d; j++) {
            xb[d * i + j] = drand48(); // xb = [[向量1],[向量2] ... ]

    // Create an index
    faiss::IndexFlatL2 index(d);
    std::cout << "index dimension : " << d << std::endl;
    // Index the data
    index.add(nb, xb);  // xb = [dimension * vector num]

    // Save the index to disk
    // faiss::write_index(&index, "example.index");

    // Load the index from disk
    //faiss::IndexFlatL2 index2(d);
    //faiss::read_index(&index2, "example.index");

    // Search the index
    const int k = 5;
    const int query_vector_num = 3;
    long int *I = new long int[k * query_vector_num];  // 5 * 3 
    float *D = new float[k * query_vector_num];        // 5 * 3

    // 从xb中的前query_vector_num个向量作为输入
    // 从xb中寻找最近的k个返回
    index.search(query_vector_num, xb, k, D, I);

    // Print the results
    for(int i = 0; i < query_vector_num; i++) {
        printf("Query %d:\\n", i);
        for(int j = 0; j < k; j++) {
            show_vector(xb, I[k * i + j], d);
            printf("  %ld (distance=%g)\\n", I[k * i + j], D[k * i + j]);

    // Clean up
    delete[] xb;
    delete[] I;
    delete[] D;
    return 0;

这样一个简单的使用flat算法的faiss demo就运行起来了。

  • 代码中的一些专有名词:
flat 暴力检索
code_size 一个向量长度,默认情况下向量是不压缩的,长度=sizeof(float) * dim
codes 存放编码后的向量叫codes,用一维的float数组表示,存放方式 [[向量1],[向量2] ... ]
metric_type 计算距离的不同方法
centroid ivfflat算法中的聚类中心
probe ivfflat算法中搜索聚类中心相邻的区域
  • 基类设计:
    全量数据 :data→add→train(可选)→search
    增量数据 :data→add→search


// 基类index类,提供了基本的添加向量,检索topk,训练(对于需要预训练的检索类型)
// 不同的派生类会继承基类,来实现自定义实现
class Index {
    int d;        ///< vector dimension
    idx_t ntotal; ///< total nb of indexed vectors
    bool is_trained;
    /// type of metric this index uses for search
    MetricType metric_type;

    /** Add n vectors of dimension d to the index.
     * Vectors are implicitly assigned labels ntotal .. ntotal + n - 1
     * This function slices the input vectors in chunks smaller than
     * blocksize_add and calls add_core.
     * @param n      number of vectors
     * @param x      input matrix, size n * d
    virtual void add(idx_t n, const float* x) = 0;

     /** query n vectors of dimension d to the index.
     * return at most k vectors. If there are not enough results for a
     * query, the result array is padded with -1s.
     * @param n           number of vectors
     * @param x           input vectors to search, size n * d
     * @param k           number of extracted vectors
     * @param distances   output pairwise distances, size n*k
     * @param labels      output labels of the NNs, size n*k
    virtual void search(
            idx_t n,
            const float* x,
            idx_t k,
            float* distances,
            idx_t* labels,
            const SearchParameters* params = nullptr) const = 0;

    /** Perform training on a representative set of vectors
     * @param n      nb of training vectors
     * @param x      training vecors, size n * d
    virtual void train(idx_t n, const float* x);
    /** encode a set of vectors
     * @param n       number of vectors
     * @param x       input vectors, size n * d
     * @param bytes   output encoded vectors, size n * sa_code_size()
    virtual void sa_encode(idx_t n, const float* x, uint8_t* bytes) const;

    /** decode a set of vectors
     * @param n       number of vectors
     * @param bytes   input encoded vectors, size n * sa_code_size()
     * @param x       output vectors, size n * d
    virtual void sa_decode(idx_t n, const uint8_t* bytes, float* x) const;

1. Flat暴力求解


int main() {
    // 定义向量的维度
    int d = 4;
    // 定义向量的数量
    int nb = 100000;  // 10w

    // Generate some random data to index
    float *xb = new float[d * nb];   // 64 * 10w 
    for(int i = 0; i < nb; i++) {
        for(int j = 0; j < d; j++) {
            xb[d * i + j] = drand48(); // xb = [[向量1],[向量2] ... ]

    // 创建flat l2 index
    faiss::IndexFlatL2 index(d);
    // 在flat l2 index上添加向量库
    index.add(nb, xb);  // xb = [dimension * vector num]

    // 检索topk
    const int k = 5;
    const int query_vector_num = 3;
    // 保存检索结果 I返回topk向量的index,D返回topk向量的距离
    long int *I = new long int[k * query_vector_num];  // 5 * 3 
    float *D = new float[k * query_vector_num];        // 5 * 3

    // 开始检索
    // 从xb中的前query_vector_num个向量作为query向量,寻找topk,结果保存到D,I数组中
    index.search(query_vector_num, xb, k, D, I);

    // Print the results
    for(int i = 0; i < query_vector_num; i++) {
        printf("Query %d:\n", i);
        for(int j = 0; j < k; j++) {
            show_vector(xb, I[k * i + j], d);
            printf("  %ld (distance=%g)\n", I[k * i + j], D[k * i + j]);

    // Clean up
    delete[] xb;
    delete[] I;
    delete[] D;
    return 0;


// x: query vector 
// y: data base
// d: dimension
// nx: the number of query vector number 
// ny: the number of data base total
// res: result 
template <class BlockResultHandler, bool use_sel = false>
void exhaustive_L2sqr_seq(
        const float* x,
        const float* y,
        size_t d,
        size_t nx,
        size_t ny,
        BlockResultHandler& res,
        const IDSelector* sel = nullptr) {
    using SingleResultHandler =
            typename BlockResultHandler::SingleResultHandler;
    int nt = std::min(int(nx), omp_get_max_threads());

    FAISS_ASSERT(use_sel == (sel != nullptr));

// #pragma omp parallel num_threads(nt)   //open mp 多线程优化
        SingleResultHandler resi(res);
// #pragma omp for
        // 从查询的nx个向量里面依次遍历 (对于每个query x,找到它的topk)
        for (int64_t i = 0; i < nx; i++) {
            const float* x_i = x + i * d;
            const float* y_j = y;
            // 和向量库里面的向量依次比较
            // j 表示的向量的index, y_j 表示的是向量的起始地址
            for (size_t j = 0; j < ny; j++, y_j += d) {  
                if (use_sel && !sel->is_member(j)) {
                float disij = fvec_L2sqr(x_i, y_j, d);  // 计算距离 这里没有开平方根
                resi.add_result(disij, j);              // 加入堆中
2. IVFFlat

IVF表示的是inverted file ,相较于暴力匹配多了一步预筛的作用。目的是减少需要计算距离的目标向量的个数,做法就是直接对所有向量做k-means。

IVFFlat的基本思路是:假设聚类产出了1024个聚类中心,那么每来一个查询向量,首先计算其与1024个聚类簇心的距离,然后选择距离最近的top N个簇,只计算查询向量与这几个簇底下的向量的距离。这样计算的复杂度降低为 O(M * N/cluster_num)

// todo

如果query的向量处于两个聚类的边缘,很有可能被分到另外一个聚类中心,导致无法计算出最有结果。 但是我们可以通过各种方法提高它的精度。比如,找到与 xq 相近的几个 centroids, 然后在这几个区域内搜索. 我们每次搜索的范围(scope)覆盖的区域数量。在 faiss 里面叫 nprobe, 我们可以提高 nprobe 的数量, 以提高精确度。

int main() {
    // ...
    int nlist = 100;    // 聚类中心
    int k = 4;          // top k 
    faiss::IndexFlatL2 quantizer(d);
    faiss::IndexIVFFlat index(&quantizer, d, nlist);
    // step1 :训练 
    // 目的是产生100个聚类中心
    index.train(nb, xb);
    // step2:add添加向量
    // 根据第一步计算出来的聚类中心,将向量添加到离他最近的聚类中心 (每个聚类中心都有对应的倒排向量)
    index.add(nb, xb);
    // step3:查询
    // 根据聚类中心和nprob(搜索的相邻的聚类中心),找到对应聚类中心的倒排链,然后在倒排链上完成检索
    idx_t* I = new idx_t[k * nq];
    float* D = new float[k * nq];
    index.search(nq, xq, k, D, I);   // nq : the number of query , xy : query database 

IndexIVF 类设计:

// IndexIVF 继承了 Index, 同时继承了 L1 量化器
struct IndexIVF : Index, Level1Quantizer {

    InvertedLists* invlists;   // 向量编码后存储在倒排表里面(包括很多个倒排文件)  // 本质是两个vector嵌套的二维数组
    bool own_invlists;
    size_t code_size; // 一个d维向量最后被编码为多少字节
    size_t nprobe;    // 搜索的时候, 在几个区域里搜
    size_t max_codes; // 一次搜索最多在几个区域里搜, 用于覆盖 nprobe 参数
    // OpenMP 并行化模式
    // 0: 默认情况, 将请求拆分
    // 1: 在倒排表上进行并行化
    // 2: 同时使用0和1
    // 3: 将请求拆分为更细粒度
    int parallel_mode;
    const int PARALLEL_MODE_NO_HEAP_INIT = 1024;
    // 记录向量id与倒排表中倒排文件的关系, 可选
    // 用于支持 reconstruct 方法, 暂时不用关注
    DirectMap direct_map;
            Index* quantizer,
            size_t d,
            size_t nlist,
            size_t code_size,
            MetricType metric = METRIC_L2);
    void reset() override;
    void train(idx_t n, const float* x) override;
    void add(idx_t n, const float* x) override;
    void add_with_ids(idx_t n, const float* x, const idx_t* xids) override;
    void search(
            idx_t n,
            const float* x,
            idx_t k,
            float* distances,
            idx_t* labels) const override;
    size_t remove_ids(const IDSelector& sel) override;
    ~IndexIVF() override;

step1 train:

// 训练 L1 量化器以及子量化器
// n 为训练集向量数量(向量维度d)
// x 为训练集(长度 n*d)
void IndexIVF::train(idx_t n, const float* x) {
    if (verbose)    // 打印详情, debug 用, 默认 false
        printf("Training level-1 quantizer\n");
    // 该方法继承自 Level1Quantizer, 训练 L1 量化器
    train_q1(n, x, verbose, metric_type);
    if (verbose)
        printf("Training IVF residual\n");
    // 通过残差训练子量化器, 默认的实现是啥也不干, 先不管它
    train_residual(n, x);
    // 标记我们的 IVF 索引已经训练好了, 可以用了
    is_trained = true;

step2 add:

void IndexIVF::add_with_ids(idx_t n, const float* x, const idx_t* xids) {
    std::unique_ptr<idx_t[]> coarse_idx(new idx_t[n]);
    // 1. 找到添加的向量对应的聚类中心
    quantizer->assign(n, x, coarse_idx.get());
    // 2. 开始添加
    add_core(n, x, xids, coarse_idx.get());

// n : 添加的向量的数量
// x : 向量数据
// xids:nullptr 
// coarse_idx: 对应的聚类中心数组
void IndexIVF::add_core(
        idx_t n,
        const float* x,
        const idx_t* xids,
        const idx_t* coarse_idx,
        void* inverted_list_context) {

    size_t nadd = 0, nminus1 = 0;

    for (size_t i = 0; i < n; i++) {
        if (coarse_idx[i] < 0)

    std::unique_ptr<uint8_t[]> flat_codes(new uint8_t[n * code_size]);
    encode_vectors(n, x, coarse_idx, flat_codes.get());

    DirectMapAdd dm_adder(direct_map, n, xids);

#pragma omp parallel reduction(+ : nadd)
        int nt = omp_get_num_threads();
        int rank = omp_get_thread_num();

        // each thread takes care of a subset of lists
        for (size_t i = 0; i < n; i++) {
            idx_t list_no = coarse_idx[i];
            if (list_no >= 0 && list_no % nt == rank) {
                idx_t id = xids ? xids[i] : ntotal + i;
                size_t ofs = invlists->add_entry( 
                        flat_codes.get() + i * code_size,

                dm_adder.add(i, list_no, ofs);

            } else if (rank == 0 && list_no == -1) {
                dm_adder.add(i, -1, 0);
    ntotal += n;

step3 search:

void IndexIVF::search(
        idx_t n,
        const float* x,
        idx_t k,
        float* distances,
        idx_t* labels,
        const SearchParameters* params_in) const {
    FAISS_THROW_IF_NOT(k > 0);
    const size_t nprobe =
            std::min(nlist, params ? params->nprobe : this->nprobe);   // nprob 表示查找几个区域 默认是=1
    FAISS_THROW_IF_NOT(nprobe > 0);

    // search function for a subset of queries
    auto sub_search_func = [this, k, nprobe, params](
                                   idx_t n,
                                   const float* x,
                                   float* distances,
                                   idx_t* labels,
                                   IndexIVFStats* ivf_stats) {
        std::unique_ptr<idx_t[]> idx(new idx_t[n * nprobe]);         // 通过query的index,找到它的聚类中心的index
        std::unique_ptr<float[]> coarse_dis(new float[n * nprobe]);  // 通过query的index,找到它到聚类中心的距离
        double t0 = getmillisecs();   
        // IndexFlat::search  这里应该是找到query向量的聚类中心向量
                params ? params->quantizer_params : nullptr);

        double t1 = getmillisecs();
        // 预取所需的倒排文件, 默认实现是啥也不干(倒排文件放磁盘的时候才预取)
        invlists->prefetch_lists(idx.get(), n * nprobe);
        // 真正的搜索
        double t2 = getmillisecs();
        ivf_stats->quantization_time += t1 - t0;
        ivf_stats->search_time += t2 - t0;

    if ((parallel_mode & ~PARALLEL_MODE_NO_HEAP_INIT) == 0) {
        // ... 先忽略
    } else {
        // 入口
        sub_search_func(n, x, distances, labels, &indexIVF_stats);


// CMin 泛型类
// 提供T对象的cmp操作,min类就是比较谁最小;
// T1类型支持传入多个参数,如果T相同,那么支持比较T1
template <typename T_, typename TI_>
struct CMin {
    typedef T_ T;
    typedef TI_ TI;
    typedef CMax<T_, TI_> Crev; // reference to reverse comparison
    // 提供cmp比较谁更小
    inline static bool cmp(T a, T b) {
        return a < b;
    inline static bool cmp2(T a1, T b1, TI a2, TI b2) {
        return (a1 < b1) || ((a1 == b1) && (a2 < b2));
    // 返回T类型的最小值
    inline static T neutral() {
        return std::numeric_limits<T>::lowest();
    static const bool is_max = false;

    // 返回T类型的下一个值
    inline static T nextafter(T x) {
        return cmin_nextafter(x);

// 使用方式2:
using HeapForIP2 = CMin<uint16_t, int64_t>;
HeapForIP2 heap2;
std::cout << heap2.cmp(100, 200) << std::endl;
std::cout << heap2.neutral() << std::endl;
std::cout << heap2.nextafter(100) << std::endl;

// 使用方式1:
// IP内积实际上是算余弦相似度, 越大越相似, 使用最小堆求 topK 个最大值
// L2计算里氏距离, 越小越相似, 使用最大堆求 topK 个最小值
using HeapForIP = CMin<float, idx_t>;
using HeapForL2 = CMax<float, idx_t>;
heap_heapify<HeapForIP>(k, simi, idxi);

template <class C>
inline void heap_heapify(
        size_t k,
        typename C::T* bh_val,
        typename C::TI* bh_ids,
        const typename C::T* x = nullptr,
        const typename C::TI* ids = nullptr,
        size_t k0 = 0) {
    if (k0 > 0)

    if (ids) {
        for (size_t i = 0; i < k0; i++)
            heap_push<C>(i + 1, bh_val, bh_ids, x[i], ids[i]);
    } else {
        for (size_t i = 0; i < k0; i++)
            heap_push<C>(i + 1, bh_val, bh_ids, x[i], i);

    for (size_t i = k0; i < k; i++) {
        bh_val[i] = C::neutral();
        bh_ids[i] = -1;

template <class C>
inline void heap_push(
        size_t k,
        typename C::T* bh_val,
        typename C::TI* bh_ids,
        typename C::T val,
        typename C::TI id) {
    bh_val--; /* Use 1-based indexing for easier node->child translation */
    size_t i = k, i_father;
    while (i > 1) {
        i_father = i >> 1;
        if (!C::cmp2(val, bh_val[i_father], id, bh_ids[i_father])) {
            /* the heap structure is ok */
        bh_val[i] = bh_val[i_father];
        bh_ids[i] = bh_ids[i_father];
        i = i_father;
    bh_val[i] = val;
    bh_ids[i] = id;


向量维度 d :128
metrice_type ip内积
nlist nlist=3000 聚类区域个数
niter niter = 25 聚类迭代次数
nredo nredo = 1 聚类重复计算次数,取最小结果
nprobe nprobe=3 搜索查找相邻聚类中心的数量

IVFFlat 可以在牺牲一部分精度的前提下,提高检索的速度。但是flat的方式,并不会向量进行压缩,为了节约内存提出了ivfpq的算法。PQ 是 Product quantization(乘积量化) 的缩写,按照文章里面的测试,可以降低90%多的内存占用。

IVFPQ 的思路是:

  1. 首先对向量库的向量按照维度进行切分,比如把 n * 128维的向量切分为 n * (8 个 16维)的子向量;
  2. 分别对子向量进行聚类 (ivf提现在这里),比如每个子向量产出 256个聚类中心,一共 8 * 256 个聚类中心;
  3. 对于一个128维的向量而来,可以分为8个子向量,每个子向量都可以找到对应的聚类中心。那么,就可以用对应的聚类中心序号来代替这个向量。[1,2,xxxxxx, 127] --> [c1,c2,c3,c4,c5,c6,c7,c8]。这里c的值域空间是0~255,只需要8个bit就可以存下来;这样就可以把一个很长的向量用很小的空间保存下来了。(pq核心思想)
  4. 对于查询向量q来说,首先先把自己量化,变为[c1,c3,c4,c2,xxxx,cn]。然后,在量化过后的向量库上做topk,找到和自己最近的k个量化后的向量。然后,将这个k个向量转换为对应的原始向量。计算出距离。
int main() {
    int nlist = 100;                 // 聚类中心数量
    int k = 4;                       // topk
    int m = 8;                       // number of subquantizers
    faiss::IndexFlatL2 quantizer(d); 
    faiss::IndexIVFPQ index(&quantizer, d, nlist, m, 8);
    index.train(nb, xb);
    index.add(nb, xb);

    idx_t* I = new idx_t[k * 5];
    float* D = new float[k * 5];
    index.search(5, xb, k, D, I);
    return 0;

HNSW是多层 negivate small world的缩写,HNSW/NSW 都是利用了图论的思想(首先假设有一张图,先不考虑这张图是如何构建的),通过随机找一个入口点,从入口点开始按照各种算法去遍历图,直到直到相近的点。下面先从最简单的朴素算法开始介绍。




  1. 有一个很容易的的朴素想法,把某些点和点之间连上线,构成一个查找图,存下来备用;
  2. 当我想查找与粉色点最近的一点时,我从任意一个黑色点出发,计算它和粉色点的距离,与这个任意黑色点有连接关系的点我们称之为“友点”(直译);
  3. 然后我要计算这个黑色点的所有“友点”与粉色点的距离,从所有“友点”中选出与粉色点最近的一个点,把这个点作为下一个进入点;
  4. 继续按照上面的步骤查找下去。如果当前黑色点对粉色点的距离比所有“友点”都近,终止查找,这个黑色点就是我们要找的离粉色点最近的点。
  • 步骤:
  1. 从任意一个黑色点出发,这里我们随便选个C点吧,计算一下C点和粉色点的距离,存下来备用;
  2. 再计算C点的所有友点(A,I,B)与粉色点的距离(计算距离和度量的方式有多种,这里我们采用欧氏距离,就是二维物理空间上的“近和远”),我们计算得出B与粉色点的距离最近,而且B点距离粉色点的距离比C点距离粉色点的距离(前面算过)更近l;
  3. 所以我们下面用B点继续查找。B点距离粉色点的距离保存下来,B点的友点是E,A,C,I,H,分别计算它们与粉色点的距离,得到E点与粉色点距离最近,且E点比B点距离粉色点还要近,所以我们选择E点作为下一个查找点。
  4. E点的友点是J,B,D,G,这时我们发现J点的与粉色点的距离最近,但是,but,however,J点的距离粉色点的距离比E点还要远,所以满足了终止查找的条件,因此我们返回E点。
  • 缺点:
  1. 首先,我们发现图中的K点是无法被查询到的,因为K点没有友点
  2. 其次,如果我们要查找距离粉色点最近的两个点,而这两个近点之间如果没有连线,那么将大大影响效率(比如L和E点,如果L和E有连线,那么我们可以轻易用上述方法查出距离粉色点最近的两个点);
  3. 最后D点真的需要这么多“友点”吗?谁是谁的友点应该怎么确定呢?



  1. 向图中逐个插入点,点的选取上随机的;
  2. 插入一个全新点时,通过朴素想法中的朴素查找法查找到与这个全新点最近的m个点(通过计算“友点”和待插入点的距离来判断下一个进入点是哪个点,m由用户设置);
  3. 连接全新点到m个点的连线。




设置一个常数ml,通过公式floor(-ln(uniform(0,1)) x ml)来计算这个点可以深入到第几层。公式中x是乘号,floor()的含义是向下取整,uniform(0,1)的含义是在均匀分布中随机取出一个值,ln()表示取对数。HNSW的算法参数:

  1. 一个节点的友点数量,友点越多,图越复杂,查询效率越低;
  2. hnsw的层数,层数越少,查询效率越低;层数越多,内存消耗和构建图的复杂度会更多;
metrice_type ip内积
m m=30, 第非0层的邻居的个数,第0层邻居为2m
ef_search ef_search:160 为了避免查询时间过长,单个向量的candidate的pop最多进行ef_search次
b_m b_m:128 构建时每层邻居的个数

4. faiss中关于计算效率的优化

faiss 大面积的使用了open mp来进行平行计算来提供计算效率,这种场景读多写少,for(i~n)循环计算的逻辑非常适合用open mp改造。举一个简单的例子:

#include <iostream>
#include <omp.h>
#include <vector>
#include <chrono>

int main() {
    int n = 100000000;
    std::vector<int> nums(n, 1);
    int sum = 0;

    auto start = std::chrono::high_resolution_clock::now();

    // 无优化的串行计算
    for (int i = 0; i < n; i++) {
        sum += nums[i];

    auto end = std::chrono::high_resolution_clock::now();
    std::chrono::duration<double> serial_time = end - start;
    std::cout << "Serial sum: " << sum << " - Time: " << serial_time.count() << " seconds" << std::endl;

    sum = 0; // 重置sum
    start = std::chrono::high_resolution_clock::now();

    // 使用OpenMP并行计算
    #pragma omp parallel for reduction(+:sum)
    for (int i = 0; i < n; i++) {
        sum += nums[i];

    end = std::chrono::high_resolution_clock::now();
    std::chrono::duration<double> parallel_time = end - start;

    std::cout << "Parallel sum: " << sum << " - Time: " << parallel_time.count() << " seconds" << std::endl;
    std::cout << "Speedup: " << serial_time.count() / parallel_time.count() << std::endl;
    return 0;

Serial sum: 100000000 - Time: 0.234739 seconds
Parallel sum: 100000000 - Time: 0.0501435 seconds
Speedup: 4.68134

open mp这种并行编程模型,提供了更高级的并行抽象,也更容易改造基于for循环的串行代码。

5. 小结
  • Faiss不同索引的对比:
    1. 不在意内存:选择HNSW;
    2. 不在意时间:XXXFlat;
    3. 在意内存,在意时间,不要在意精度: PQ/IVFFALT


