美文网首页
halide编程技术指南(连载四)

halide编程技术指南(连载四)

作者: 麻瓜智能 | 来源:发表于2020-04-04 16:34 被阅读0次

    本文是halide编程指南的连载,已同步至公众号

    第八章 多级管道调度

    #include "Halide.h"

    #include <stdio.h>

    using namespace Halide;

    int main(int argc, char **argv) {

        // 首先,我们将在下面声明一些Vars.

        Var x("x"), y("y");

        // 让我们研究一个简单的两阶段管道的各种调度选项。 我们将从默认时间表开始:

        {

            Func producer("producer_default"), consumer("consumer_default");

            // 第一阶段将是一些简单的逐点数学运算,类似于我们熟悉的渐变函数。 x,y位置的值是x和y乘积的正弦值.

            producer(x, y) = sin(x * y);

            // 现在,我们将添加第二阶段,该阶段将第一阶段的多个点平均在一起.

            consumer(x, y) = (producer(x, y) +

                              producer(x, y+1) +

                              producer(x+1, y) +

                              producer(x+1, y+1))/4;

            //我们将打开这两个功能的跟踪。

            consumer.trace_stores();

            producer.trace_stores();

            // 并在4x4块上进行评估。

            printf("\nEvaluating producer-consumer pipeline with default schedule\n");

            consumer.realize(4, 4);

    -----------------------------------------------------------------

    > Begin pipeline consumer_default.0()

    > Store consumer_default.0(0, 0) = 0.210368

    > Store consumer_default.0(1, 0) = 0.437692

    > Store consumer_default.0(2, 0) = 0.262604

    > Store consumer_default.0(3, 0) = -0.153921

    > Store consumer_default.0(0, 1) = 0.437692

    > Store consumer_default.0(1, 1) = 0.475816

    > Store consumer_default.0(2, 1) = 0.003550

    > Store consumer_default.0(3, 1) = 0.023565

    > Store consumer_default.0(0, 2) = 0.262604

    > Store consumer_default.0(1, 2) = 0.003550

    > Store consumer_default.0(2, 2) = -0.225879

    > Store consumer_default.0(3, 2) = 0.146372

    > Store consumer_default.0(0, 3) = -0.153921

    > Store consumer_default.0(1, 3) = 0.023565

    > Store consumer_default.0(2, 3) = 0.146372

    > Store consumer_default.0(3, 3) = -0.237233

    > End pipeline consumer_default.0()

    ------------------------------------------------------------------------

            // 没有有关计算生产者价值的消息。这是因为默认计划将“producer”完全内联到“consumer”中。 就像我们写了以下代码一样:

            // consumer(x, y) = (sin(x * y) +

            //                  sin(x * (y + 1)) +

            //                  sin((x + 1) * y) +

            //                  sin((x + 1) * (y + 1))/4);

            // 所有对“producer”的调用均已替换为“producer”的主体,并用参数代替了变量。

            // 等效的c代码为:

            float result[4][4];

            for (int y = 0; y < 4; y++) {

                for (int x = 0; x < 4; x++) {

                    result[y][x] = (sin(x*y) +

                                    sin(x*(y+1)) +

                                    sin((x+1)*y) +

                                    sin((x+1)*(y+1)))/4;

                }

            }

            printf("\n");

            // 如果我们看一下循环嵌套,producer根本就不会出现。 它已内联到consumer中。

            printf("Pseudo-code for the schedule:\n");

            consumer.print_loop_nest();

            printf("\n");

    > produce consumer_default:

    >  for y:

    >    for x:

    >      consumer_default(...) = ...

    --------------------------------------------------------------------------

        }

        // 接下来,我们将研究下一个最简单的选项-在计算任何consumer之前,计算producer中所需的所有值。 我们称此时间表为“root”。

        {

            // 从相同的函数开始:

            Func producer("producer_root"), consumer("consumer_root");

            producer(x, y) = sin(x * y);

            consumer(x, y) = (producer(x, y) +

                              producer(x, y+1) +

                              producer(x+1, y) +

                              producer(x+1, y+1))/4;

            // 告诉Halide在评估任何producer之前先评估所有consumer.

            producer.compute_root();

            // 打开跟踪.

            consumer.trace_stores();

            producer.trace_stores();

            // 编译运行.

            printf("\nEvaluating producer.compute_root()\n");

            consumer.realize(4, 4);

    --------------------------------------------------------------------------

    > Begin pipeline consumer_root.0()

    > Store producer_root.0(0, 0) = 0.000000

    > Store producer_root.0(1, 0) = 0.000000

    > Store producer_root.0(2, 0) = 0.000000

    > Store producer_root.0(3, 0) = 0.000000

    > Store producer_root.0(4, 0) = 0.000000

    > Store producer_root.0(0, 1) = 0.000000

    > Store producer_root.0(1, 1) = 0.841471

    > Store producer_root.0(2, 1) = 0.909297

    > Store producer_root.0(3, 1) = 0.141120

    > Store producer_root.0(4, 1) = -0.756802

    > Store producer_root.0(0, 2) = 0.000000

    > Store producer_root.0(1, 2) = 0.909297

    > Store producer_root.0(2, 2) = -0.756802

    > Store producer_root.0(3, 2) = -0.279415

    > Store producer_root.0(4, 2) = 0.989358

    > Store producer_root.0(0, 3) = 0.000000

    > Store producer_root.0(1, 3) = 0.141120

    > Store producer_root.0(2, 3) = -0.279415

    > Store producer_root.0(3, 3) = 0.412118

    > Store producer_root.0(4, 3) = -0.536573

    > Store producer_root.0(0, 4) = 0.000000

    > Store producer_root.0(1, 4) = -0.756802

    > Store producer_root.0(2, 4) = 0.989358

    > Store producer_root.0(3, 4) = -0.536573

    > Store producer_root.0(4, 4) = -0.287903

    > Store consumer_root.0(0, 0) = 0.210368

    > Store consumer_root.0(1, 0) = 0.437692

    > Store consumer_root.0(2, 0) = 0.262604

    > Store consumer_root.0(3, 0) = -0.153921

    > Store consumer_root.0(0, 1) = 0.437692

    > Store consumer_root.0(1, 1) = 0.475816

    > Store consumer_root.0(2, 1) = 0.003550

    > Store consumer_root.0(3, 1) = 0.023565

    > Store consumer_root.0(0, 2) = 0.262604

    > Store consumer_root.0(1, 2) = 0.003550

    > Store consumer_root.0(2, 2) = -0.225879

    > Store consumer_root.0(3, 2) = 0.146372

    > Store consumer_root.0(0, 3) = -0.153921

    > Store consumer_root.0(1, 3) = 0.023565

    > Store consumer_root.0(2, 3) = 0.146372

    > Store consumer_root.0(3, 3) = -0.237233

    > End pipeline consumer_root.0()

    -------------------------------------------------------------------------------

            // 从输出中可以看到:

            // A) 有关于producer的store

            // B) 在consumer的store之前就已经发生了

            // 如下是可视化图.

            // producer在左边,consumer在右边。 store用橙色标记,load用蓝色标记。

            图81

            // 等效c代码:

            float result[4][4];

            // 为producer分配一些临时存储.

            float producer_storage[5][5];

            // 计算 producer.

            for (int y = 0; y < 5; y++) {

                for (int x = 0; x < 5; x++) {

                    producer_storage[y][x] = sin(x * y);

                }

            }

            // 计算consumer. 跳过打印,不看打印的中间结果了.

            for (int y = 0; y < 4; y++) {

                for (int x = 0; x < 4; x++) {

                    result[y][x] = (producer_storage[y][x] +

                                    producer_storage[y+1][x] +

                                    producer_storage[y][x+1] +

                                    producer_storage[y+1][x+1])/4;

                }

            }

            // 请注意,对consumer的评估是在4x4的box上进行的,因此Halide会自动推断出在5x5的box上需要producer。 这与我们在上一课中看到的“边界推断”逻辑相同,在该逻辑中,该逻辑用于检测和避免输入图像的越界读取。

            // 如果我们打印循环嵌套,我们将看到与上面的C非常相似的内容.

            printf("Pseudo-code for the schedule:\n");

            consumer.print_loop_nest();

            printf("\n");

    -----------------------------------------------------------------

    > produce producer_root:

    >  for y:

    >    for x:

    >      producer_root(...) = ...

    > consume producer_root:

    >  produce consumer_root:

    >    for y:

    >      for x:

    >        consumer_root(...) = ...

    --------------------------------------------------------------------

        }

        // 让我们从性能的角度比较上述两种方法。

        // 全内联 (默认的步骤):

        // - 分配的临时内存: 0

        // - Loads: 0

        // - Stores: 16

        // - Calls to sin: 64

        // producer.compute_root():

        // - 分配的临时内存: 25 floats

        // - Loads: 64

        // - Stores: 41

        // - Calls to sin: 25

        // 这里需要权衡。 全内联使用了最少的临时内存和内存带宽,但是做了一大堆多余的昂贵数学运算(称为sin)。 它四次评估了“生产者”中的大多数得分。 第二个调度程序producer.compute_root()执行了对sin的最小调用,但是使用了更多的临时内存和更多的内存带宽。

        // 在任何给定情况下,都很难做出正确的选择。 如果您的内存带宽有限,或者没有太多的内存(例如,因为您使用的是旧手机),那么进行冗余数学运算就很有意义。 另一方面,Sin代价昂贵,因此,如果您的计算受到限制,那么对sin的调用会减少,从而使您的程序更快。 添加矢量化或多核并行度会倾斜比例,以便进行冗余工作,因为启动多个cpu核会增加每秒可以执行的数学运算量,但不会增加系统内存带宽或容量。

        // 我们可以在完整内联和compute_root之间进行选择。 接下来,我们将在根据每个扫描线计算生产者和消费者之间交替进行:

        {

            // 从相同的函数定义开始:

            Func producer("producer_y"), consumer("consumer_y");

            producer(x, y) = sin(x * y);

            consumer(x, y) = (producer(x, y) +

                              producer(x, y+1) +

                              producer(x+1, y) +

                              producer(x+1, y+1))/4;

            // 告诉Halide,对于consumer的每个y坐标,根据需要评估producer:

            producer.compute_at(consumer, y);

            // 这会将计算producer的代码放在y的for循环内*,如下面的等效C所示.

            // 打开跟踪.

            producer.trace_stores();

            consumer.trace_stores();

            // 编译运行.

            printf("\nEvaluating producer.compute_at(consumer, y)\n");

            consumer.realize(4, 4);

    ----------------------------------------------------------------------

    > Begin pipeline consumer_y.0()

    > Store producer_y.0(0, 0) = 0.000000

    > Store producer_y.0(1, 0) = 0.000000

    > Store producer_y.0(2, 0) = 0.000000

    > Store producer_y.0(3, 0) = 0.000000

    > Store producer_y.0(4, 0) = 0.000000

    > Store producer_y.0(0, 1) = 0.000000

    > Store producer_y.0(1, 1) = 0.841471

    > Store producer_y.0(2, 1) = 0.909297

    > Store producer_y.0(3, 1) = 0.141120

    > Store producer_y.0(4, 1) = -0.756802

    > Store consumer_y.0(0, 0) = 0.210368

    > Store consumer_y.0(1, 0) = 0.437692

    > Store consumer_y.0(2, 0) = 0.262604

    > Store consumer_y.0(3, 0) = -0.153921

    > Store producer_y.0(0, 1) = 0.000000

    > Store producer_y.0(1, 1) = 0.841471

    > Store producer_y.0(2, 1) = 0.909297

    > Store producer_y.0(3, 1) = 0.141120

    > Store producer_y.0(4, 1) = -0.756802

    > Store producer_y.0(0, 2) = 0.000000

    > Store producer_y.0(1, 2) = 0.909297

    > Store producer_y.0(2, 2) = -0.756802

    > Store producer_y.0(3, 2) = -0.279415

    > Store producer_y.0(4, 2) = 0.989358

    > Store consumer_y.0(0, 1) = 0.437692

    > Store consumer_y.0(1, 1) = 0.475816

    > Store consumer_y.0(2, 1) = 0.003550

    > Store consumer_y.0(3, 1) = 0.023565

    > Store producer_y.0(0, 2) = 0.000000

    > Store producer_y.0(1, 2) = 0.909297

    > Store producer_y.0(2, 2) = -0.756802

    > Store producer_y.0(3, 2) = -0.279415

    > Store producer_y.0(4, 2) = 0.989358

    > Store producer_y.0(0, 3) = 0.000000

    > Store producer_y.0(1, 3) = 0.141120

    > Store producer_y.0(2, 3) = -0.279415

    > Store producer_y.0(3, 3) = 0.412118

    > Store producer_y.0(4, 3) = -0.536573

    > Store consumer_y.0(0, 2) = 0.262604

    > Store consumer_y.0(1, 2) = 0.003550

    > Store consumer_y.0(2, 2) = -0.225879

    > Store consumer_y.0(3, 2) = 0.146372

    > Store producer_y.0(0, 3) = 0.000000

    > Store producer_y.0(1, 3) = 0.141120

    > Store producer_y.0(2, 3) = -0.279415

    > Store producer_y.0(3, 3) = 0.412118

    > Store producer_y.0(4, 3) = -0.536573

    > Store producer_y.0(0, 4) = 0.000000

    > Store producer_y.0(1, 4) = -0.756802

    > Store producer_y.0(2, 4) = 0.989358

    > Store producer_y.0(3, 4) = -0.536573

    > Store producer_y.0(4, 4) = -0.287903

    > Store consumer_y.0(0, 3) = -0.153921

    > Store consumer_y.0(1, 3) = 0.023565

    > Store consumer_y.0(2, 3) = 0.146372

    > Store consumer_y.0(3, 3) = -0.237233

    > End pipeline consumer_y.0()

    ---------------------------------------------------------------------

            // 如下可视化图.

            图82

            // 阅读日志或查看该图,您应该看到producer和consumer在每条扫描线的基础上交替出现。 让我们看看等效的C:

            float result[4][4];

            // consumer的扫描线存在外循环:

            for (int y = 0; y < 4; y++) {

                // 分配空间并计算producer的足够数量,以满足consumer的这一条扫描线。 这意味着producer的box大小为5x2。

                float producer_storage[2][5];

                for (int py = y; py < y + 2; py++) {

                    for (int px = 0; px < 5; px++) {

                        producer_storage[py-y][px] = sin(px * py);

                    }

                }

                // 计算consumer的扫描线 .

                for (int x = 0; x < 4; x++) {

                    result[y][x] = (producer_storage[0][x] +

                                    producer_storage[1][x] +

                                    producer_storage[0][x+1] +

                                    producer_storage[1][x+1])/4;

                }

            }

            // 同样,如果我们打印循环嵌套,我们将看到与上面的C非常相似的内容。

            printf("Pseudo-code for the schedule:\n");

            consumer.print_loop_nest();

            printf("\n");

    -------------------------------------------------------------------------------

    > produce consumer_y:

    >  for y:

    >    produce producer_y:

    >      for y:

    >        for x:

    >          producer_y(...) = ...

    >    consume producer_y:

    >      for x:

    >        consumer_y(...) = ...

    ------------------------------------------------------------------------------

            // 该策略的性能特征介于内联和计算root之间。 我们仍然分配一些临时内存,但分配的内存要少一些,并且具有更好的局部性(在写入后不久便从中加载它,因此对于较大的图像,值仍应位于缓存中)。 我们仍然做了一些多余的工作,但还不到完整的内联:

            // producer.compute_at(consumer, y):

            // - 分配的临时内存: 10 floats

            // - Loads: 64

            // - Stores: 56

            // - Calls to sin: 40

        }

        // 我们也可以说producer.compute_at(consumer,x),但这与完全内联(默认计划)非常相似。 相反,让我们区分为producer分配存储的循环级别和实际计算存储的循环级别。 这解锁了一些优化.

        {

            Func producer("producer_root_y"), consumer("consumer_root_y");

            producer(x, y) = sin(x * y);

            consumer(x, y) = (producer(x, y) +

                              producer(x, y+1) +

                              producer(x+1, y) +

                              producer(x+1, y+1))/4;

            // 告诉Halide创建一个缓冲区,以在最外层存储所有producer:

            producer.store_root();

            // ... 但需要根据consumer的y坐标进行计算.

            producer.compute_at(consumer, y);

            producer.trace_stores();

            consumer.trace_stores();

            printf("\nEvaluating producer.store_root().compute_at(consumer, y)\n");

            consumer.realize(4, 4);

    -------------------------------------------------------------------

    > Begin pipeline consumer_root_y.0()

    > Store producer_root_y.0(0, 0) = 0.000000

    > Store producer_root_y.0(1, 0) = 0.000000

    > Store producer_root_y.0(2, 0) = 0.000000

    > Store producer_root_y.0(3, 0) = 0.000000

    > Store producer_root_y.0(4, 0) = 0.000000

    > Store producer_root_y.0(0, 1) = 0.000000

    > Store producer_root_y.0(1, 1) = 0.841471

    > Store producer_root_y.0(2, 1) = 0.909297

    > Store producer_root_y.0(3, 1) = 0.141120

    > Store producer_root_y.0(4, 1) = -0.756802

    > Store consumer_root_y.0(0, 0) = 0.210368

    > Store consumer_root_y.0(1, 0) = 0.437692

    > Store consumer_root_y.0(2, 0) = 0.262604

    > Store consumer_root_y.0(3, 0) = -0.153921

    > Store producer_root_y.0(0, 2) = 0.000000

    > Store producer_root_y.0(1, 2) = 0.909297

    > Store producer_root_y.0(2, 2) = -0.756802

    > Store producer_root_y.0(3, 2) = -0.279415

    > Store producer_root_y.0(4, 2) = 0.989358

    > Store consumer_root_y.0(0, 1) = 0.437692

    > Store consumer_root_y.0(1, 1) = 0.475816

    > Store consumer_root_y.0(2, 1) = 0.003550

    > Store consumer_root_y.0(3, 1) = 0.023565

    > Store producer_root_y.0(0, 3) = 0.000000

    > Store producer_root_y.0(1, 3) = 0.141120

    > Store producer_root_y.0(2, 3) = -0.279415

    > Store producer_root_y.0(3, 3) = 0.412118

    > Store producer_root_y.0(4, 3) = -0.536573

    > Store consumer_root_y.0(0, 2) = 0.262604

    > Store consumer_root_y.0(1, 2) = 0.003550

    > Store consumer_root_y.0(2, 2) = -0.225879

    > Store consumer_root_y.0(3, 2) = 0.146372

    > Store producer_root_y.0(0, 4) = 0.000000

    > Store producer_root_y.0(1, 4) = -0.756802

    > Store producer_root_y.0(2, 4) = 0.989358

    > Store producer_root_y.0(3, 4) = -0.536573

    > Store producer_root_y.0(4, 4) = -0.287903

    > Store consumer_root_y.0(0, 3) = -0.153921

    > Store consumer_root_y.0(1, 3) = 0.023565

    > Store consumer_root_y.0(2, 3) = 0.146372

    > Store consumer_root_y.0(3, 3) = -0.237233

    > End pipeline consumer_root_y.0()

    -------------------------------------------------------------------

            //下图是可视化图。

            图83

            // 阅读日志或查看该图,您应该看到producer和consumer再次基于每个扫描线交替使用。 它计算producer的5x2框以满足consumer的第一条扫描线,但是之后,它仅为consumer的每条新扫描线计算输出的5x1框!

            //

            // Halide已检测到,除第一个扫描线外,所有扫描线都可以重用已经存在于我们为producer分配的缓冲区中的值。 让我们看一下等效的C:

            float result[4][4];

            // producer.store_root() 意味着存储在这里:

            float producer_storage[5][5];

            // consumer的扫描线存在外部循环:

            for (int y = 0; y < 4; y++) {

                //计算足够的producer以满足consumer的需求。

                for (int py = y; py < y + 2; py++) {

                    // 跳过我们在上一次迭代中已经计算出的producer行.

                    if (y > 0 && py == y) continue;

                    for (int px = 0; px < 5; px++) {

                        producer_storage[py][px] = sin(px * py);

                    }

                }

                // 计算consumer的扫描线.

                for (int x = 0; x < 4; x++) {

                    result[y][x] = (producer_storage[y][x] +

                                    producer_storage[y+1][x] +

                                    producer_storage[y][x+1] +

                                    producer_storage[y+1][x+1])/4;

                }

            }

            printf("Pseudo-code for the schedule:\n");

            consumer.print_loop_nest();

            printf("\n");

    --------------------------------------------------------------------------

    > store producer_root_y:

    >  produce consumer_root_y:

    >    for y:

    >      produce producer_root_y:

    >        for y:

    >          for x:

    >            producer_root_y(...) = ...

    >      consume producer_root_y:

    >        for x:

    >          consumer_root_y(...) = ...

    -------------------------------------------------------------------

            // 此策略的性能特征非常好! 这些数字与compute_root类似,但局部性更好。 我们正在执行最少数量的sin调用,并且在存储值之后立即加载值,因此我们可能会充分利用缓存:

            // producer.store_root().compute_at(consumer, y):

            // - 分配的临时内存: 10 floats

            // - Loads: 64

            // - Stores: 39

            // - Calls to sin: 25

            // 请注意,我声称的分配的内存量与参考C代码不匹配。 Halide正在后台进行另一项优化。 它将producer的存储折叠到两条扫描线的循环缓冲区中。 等效的C实际上看起来像这样:

            {

                // 实际存储2条扫描线,而不是5条

                float producer_storage[2][5];

                for (int y = 0; y < 4; y++) {

                    for (int py = y; py < y + 2; py++) {

                        if (y > 0 && py == y) continue;

                        for (int px = 0; px < 5; px++) {

                            // 到producer_storage的存储的y坐标是位掩码的.

                            producer_storage[py & 1][px] = sin(px * py);

                        }

                    }

                    // 计算consumer的扫描线.

                    for (int x = 0; x < 4; x++) {

                        // 来自producer_storage的负载的y坐标为位掩码.

                        result[y][x] = (producer_storage[y & 1][x] +

                                        producer_storage[(y+1) & 1][x] +

                                        producer_storage[y & 1][x+1] +

                                        producer_storage[(y+1) & 1][x+1])/4;

                    }

                }

            }

        }

        // 我们可以做得更好,将存储留在最外层,但是将计算移到最内层的循环中:

        {

            Func producer("producer_root_x"), consumer("consumer_root_x");

            producer(x, y) = sin(x * y);

            consumer(x, y) = (producer(x, y) +

                              producer(x, y+1) +

                              producer(x+1, y) +

                              producer(x+1, y+1))/4;

            // 存储最外层,计算最内层。

            producer.store_root().compute_at(consumer, x);

            producer.trace_stores();

            consumer.trace_stores();

            printf("\nEvaluating producer.store_root().compute_at(consumer, x)\n");

            consumer.realize(4, 4);

    ------------------------------------------------------------------

    > Begin pipeline consumer_root_x.0()

    > Store producer_root_x.0(0, 0) = 0.000000

    > Store producer_root_x.0(1, 0) = 0.000000

    > Store producer_root_x.0(0, 1) = 0.000000

    > Store producer_root_x.0(1, 1) = 0.841471

    > Store consumer_root_x.0(0, 0) = 0.210368

    > Store producer_root_x.0(2, 0) = 0.000000

    > Store producer_root_x.0(2, 1) = 0.909297

    > Store consumer_root_x.0(1, 0) = 0.437692

    > Store producer_root_x.0(3, 0) = 0.000000

    > Store producer_root_x.0(3, 1) = 0.141120

    > Store consumer_root_x.0(2, 0) = 0.262604

    > Store producer_root_x.0(4, 0) = 0.000000

    > Store producer_root_x.0(4, 1) = -0.756802

    > Store consumer_root_x.0(3, 0) = -0.153921

    > Store producer_root_x.0(0, 2) = 0.000000

    > Store producer_root_x.0(1, 2) = 0.909297

    > Store consumer_root_x.0(0, 1) = 0.437692

    > Store producer_root_x.0(2, 2) = -0.756802

    > Store consumer_root_x.0(1, 1) = 0.475816

    > Store producer_root_x.0(3, 2) = -0.279415

    > Store consumer_root_x.0(2, 1) = 0.003550

    > Store producer_root_x.0(4, 2) = 0.989358

    > Store consumer_root_x.0(3, 1) = 0.023565

    > Store producer_root_x.0(0, 3) = 0.000000

    > Store producer_root_x.0(1, 3) = 0.141120

    > Store consumer_root_x.0(0, 2) = 0.262604

    > Store producer_root_x.0(2, 3) = -0.279415

    > Store consumer_root_x.0(1, 2) = 0.003550

    > Store producer_root_x.0(3, 3) = 0.412118

    > Store consumer_root_x.0(2, 2) = -0.225879

    > Store producer_root_x.0(4, 3) = -0.536573

    > Store consumer_root_x.0(3, 2) = 0.146372

    > Store producer_root_x.0(0, 4) = 0.000000

    > Store producer_root_x.0(1, 4) = -0.756802

    > Store consumer_root_x.0(0, 3) = -0.153921

    > Store producer_root_x.0(2, 4) = 0.989358

    > Store consumer_root_x.0(1, 3) = 0.023565

    > Store producer_root_x.0(3, 4) = -0.536573

    > Store consumer_root_x.0(2, 3) = 0.146372

    > Store producer_root_x.0(4, 4) = -0.287903

    > Store consumer_root_x.0(3, 3) = -0.237233

    > End pipeline consumer_root_x.0()

    --------------------------------------------------------------------

            // 可视化图如下

            图84

            // 您应该看到producer和consumer现在按像素交替。 这是等效的C:

            float result[4][4];

            // producer.store_root() 意味着存储在这里,但是我们可以将其折叠到两条扫描线的循环缓冲区中:

            float producer_storage[2][5];

            // 对于consumer的每个像素:

            for (int y = 0; y < 4; y++) {

                for (int x = 0; x < 4; x++) {

                    // 计算足够的producer以满足consumer的这个像素,但是跳过我们已经计算出的值:

                    if (y == 0 && x == 0)

                        producer_storage[y & 1][x] = sin(x*y);

                    if (y == 0)

                        producer_storage[y & 1][x+1] = sin((x+1)*y);

                    if (x == 0)

                        producer_storage[(y+1) & 1][x] = sin(x*(y+1));

                    producer_storage[(y+1) & 1][x+1] = sin((x+1)*(y+1));

                    result[y][x] = (producer_storage[y & 1][x] +

                                    producer_storage[(y+1) & 1][x] +

                                    producer_storage[y & 1][x+1] +

                                    producer_storage[(y+1) & 1][x+1])/4;

                }

            }

            printf("Pseudo-code for the schedule:\n");

            consumer.print_loop_nest();

            printf("\n");

    ----------------------------------------------------------------

    > store producer_root_x:

    >  produce consumer_root_x:

    >    for y:

    >      for x:

    >        produce producer_root_x:

    >          for y:

    >            for x:

    >              producer_root_x(...) = ...

    >        consume producer_root_x:

    >          consumer_root_x(...) = ...

    --------------------------------------------------------------

            // 该策略的性能特征是迄今为止最好的。 我们需要的producer的四个值之一可能仍然位于寄存器中,因此我不会将其视为负载:

            // producer.store_root().compute_at(consumer, x):

            // - 分配的临时内存: 10 floats

            // - Loads: 48

            // - Stores: 56

            // - Calls to sin: 40

        }

        // 那么有什么收获呢? 为什么不总是为这种类型的代码始终使用producer.store_root()。compute_at(consumer,x)?

        //

        // 答案是并行性。 在前两种策略中,我们都假设先前迭代计算出的值可供我们重用。 这假定x或y的先前值在时间上较早发生并已完成。 如果并行化或向量化任一循环,则情况并非如此。 如果您进行并行化,那么在store_at级别和compute_at级别之间存在并行循环时,Halide将不会注入跳过已经完成的优化的操作,并且也不会将存储向下折叠到循环缓冲区中,这会使我们的store_root毫无意义 。

        // 我们的选项已用完。 我们可以拆分创建新的。 我们可以在consumer的自然变量(x和y)上存储store_at或compute_at,也可以将x或y拆分为新的内部和外部子变量,然后针对它们进行调度。 我们将使用它来表示图块中的融合:

        {

            Func producer("producer_tile"), consumer("consumer_tile");

            producer(x, y) = sin(x * y);

            consumer(x, y) = (producer(x, y) +

                              producer(x, y+1) +

                              producer(x+1, y) +

                              producer(x+1, y+1))/4;

            // 我们将以4x4格来计算8x8的consumer.

            Var x_outer, y_outer, x_inner, y_inner;

            consumer.tile(x, y, x_outer, y_outer, x_inner, y_inner, 4, 4);

            // 根据consumer的每块计算producer

            producer.compute_at(consumer, x_outer);

            // 请注意,我是从管道(consumer)的末端开始编写时间表的。 这是因为producer的计划表引用了x_outer,这是我们在平铺consumer时引入的。 您可以按其他顺序编写它,但是它往往很难阅读。

            // 打开追踪.

            producer.trace_stores();

            consumer.trace_stores();

            printf("\nEvaluating:\n"

                  "consumer.tile(x, y, x_outer, y_outer, x_inner, y_inner, 4, 4);\n"

                  "producer.compute_at(consumer, x_outer);\n");

            consumer.realize(8, 8);

    ---------------------------------------------------------------------

    > Begin pipeline consumer_tile.0()

    > Store producer_tile.0(0, 0) = 0.000000

    > Store producer_tile.0(1, 0) = 0.000000

    > Store producer_tile.0(2, 0) = 0.000000

    > Store producer_tile.0(3, 0) = 0.000000

    > Store producer_tile.0(4, 0) = 0.000000

    > Store producer_tile.0(0, 1) = 0.000000

    > Store producer_tile.0(1, 1) = 0.841471

    > Store producer_tile.0(2, 1) = 0.909297

    > Store producer_tile.0(3, 1) = 0.141120

    > Store producer_tile.0(4, 1) = -0.756802

    > Store producer_tile.0(0, 2) = 0.000000

    > Store producer_tile.0(1, 2) = 0.909297

    > Store producer_tile.0(2, 2) = -0.756802

    > Store producer_tile.0(3, 2) = -0.279415

    > Store producer_tile.0(4, 2) = 0.989358

    > Store producer_tile.0(0, 3) = 0.000000

    > Store producer_tile.0(1, 3) = 0.141120

    > Store producer_tile.0(2, 3) = -0.279415

    > Store producer_tile.0(3, 3) = 0.412118

    > Store producer_tile.0(4, 3) = -0.536573

    > Store producer_tile.0(0, 4) = 0.000000

    > Store producer_tile.0(1, 4) = -0.756802

    > Store producer_tile.0(2, 4) = 0.989358

    > Store producer_tile.0(3, 4) = -0.536573

    > Store producer_tile.0(4, 4) = -0.287903

    > Store consumer_tile.0(0, 0) = 0.210368

    > Store consumer_tile.0(1, 0) = 0.437692

    > Store consumer_tile.0(2, 0) = 0.262604

    > Store consumer_tile.0(3, 0) = -0.153921

    > Store consumer_tile.0(0, 1) = 0.437692

    > Store consumer_tile.0(1, 1) = 0.475816

    > Store consumer_tile.0(2, 1) = 0.003550

    > Store consumer_tile.0(3, 1) = 0.023565

    > Store consumer_tile.0(0, 2) = 0.262604

    > Store consumer_tile.0(1, 2) = 0.003550

    > Store consumer_tile.0(2, 2) = -0.225879

    > Store consumer_tile.0(3, 2) = 0.146372

    > Store consumer_tile.0(0, 3) = -0.153921

    > Store consumer_tile.0(1, 3) = 0.023565

    > Store consumer_tile.0(2, 3) = 0.146372

    > Store consumer_tile.0(3, 3) = -0.237233

    > Store producer_tile.0(4, 0) = 0.000000

    > Store producer_tile.0(5, 0) = 0.000000

    > Store producer_tile.0(6, 0) = 0.000000

    > Store producer_tile.0(7, 0) = 0.000000

    > Store producer_tile.0(8, 0) = 0.000000

    > Store producer_tile.0(4, 1) = -0.756802

    > Store producer_tile.0(5, 1) = -0.958924

    > Store producer_tile.0(6, 1) = -0.279415

    > Store producer_tile.0(7, 1) = 0.656987

    > Store producer_tile.0(8, 1) = 0.989358

    > Store producer_tile.0(4, 2) = 0.989358

    > Store producer_tile.0(5, 2) = -0.544021

    > Store producer_tile.0(6, 2) = -0.536573

    > Store producer_tile.0(7, 2) = 0.990607

    > Store producer_tile.0(8, 2) = -0.287903

    > Store producer_tile.0(4, 3) = -0.536573

    > Store producer_tile.0(5, 3) = 0.650288

    > Store producer_tile.0(6, 3) = -0.750987

    > Store producer_tile.0(7, 3) = 0.836656

    > Store producer_tile.0(8, 3) = -0.905578

    > Store producer_tile.0(4, 4) = -0.287903

    > Store producer_tile.0(5, 4) = 0.912945

    > Store producer_tile.0(6, 4) = -0.905578

    > Store producer_tile.0(7, 4) = 0.270906

    > Store producer_tile.0(8, 4) = 0.551427

    > Store consumer_tile.0(4, 0) = -0.428932

    > Store consumer_tile.0(5, 0) = -0.309585

    > Store consumer_tile.0(6, 0) = 0.094393

    > Store consumer_tile.0(7, 0) = 0.411586

    > Store consumer_tile.0(4, 1) = -0.317597

    > Store consumer_tile.0(5, 1) = -0.579733

    > Store consumer_tile.0(6, 1) = 0.207901

    > Store consumer_tile.0(7, 1) = 0.587262

    > Store consumer_tile.0(4, 2) = 0.139763

    > Store consumer_tile.0(5, 2) = -0.295323

    > Store consumer_tile.0(6, 2) = 0.134926

    > Store consumer_tile.0(7, 2) = 0.158445

    > Store consumer_tile.0(4, 3) = 0.184689

    > Store consumer_tile.0(5, 3) = -0.023333

    > Store consumer_tile.0(6, 3) = -0.137251

    > Store consumer_tile.0(7, 3) = 0.188352

    > Store producer_tile.0(0, 4) = 0.000000

    > Store producer_tile.0(1, 4) = -0.756802

    > Store producer_tile.0(2, 4) = 0.989358

    > Store producer_tile.0(3, 4) = -0.536573

    > Store producer_tile.0(4, 4) = -0.287903

    > Store producer_tile.0(0, 5) = 0.000000

    > Store producer_tile.0(1, 5) = -0.958924

    > Store producer_tile.0(2, 5) = -0.544021

    > Store producer_tile.0(3, 5) = 0.650288

    > Store producer_tile.0(4, 5) = 0.912945

    > Store producer_tile.0(0, 6) = 0.000000

    > Store producer_tile.0(1, 6) = -0.279415

    > Store producer_tile.0(2, 6) = -0.536573

    > Store producer_tile.0(3, 6) = -0.750987

    > Store producer_tile.0(4, 6) = -0.905578

    > Store producer_tile.0(0, 7) = 0.000000

    > Store producer_tile.0(1, 7) = 0.656987

    > Store producer_tile.0(2, 7) = 0.990607

    > Store producer_tile.0(3, 7) = 0.836656

    > Store producer_tile.0(4, 7) = 0.270906

    > Store producer_tile.0(0, 8) = 0.000000

    > Store producer_tile.0(1, 8) = 0.989358

    > Store producer_tile.0(2, 8) = -0.287903

    > Store producer_tile.0(3, 8) = -0.905578

    > Store producer_tile.0(4, 8) = 0.551427

    > Store consumer_tile.0(0, 4) = -0.428932

    > Store consumer_tile.0(1, 4) = -0.317597

    > Store consumer_tile.0(2, 4) = 0.139763

    > Store consumer_tile.0(3, 4) = 0.184689

    > Store consumer_tile.0(0, 5) = -0.309585

    > Store consumer_tile.0(1, 5) = -0.579733

    > Store consumer_tile.0(2, 5) = -0.295323

    > Store consumer_tile.0(3, 5) = -0.023333

    > Store consumer_tile.0(0, 6) = 0.094393

    > Store consumer_tile.0(1, 6) = 0.207901

    > Store consumer_tile.0(2, 6) = 0.134926

    > Store consumer_tile.0(3, 6) = -0.137251

    > Store consumer_tile.0(0, 7) = 0.411586

    > Store consumer_tile.0(1, 7) = 0.587262

    > Store consumer_tile.0(2, 7) = 0.158445

    > Store consumer_tile.0(3, 7) = 0.188352

    > Store producer_tile.0(4, 4) = -0.287903

    > Store producer_tile.0(5, 4) = 0.912945

    > Store producer_tile.0(6, 4) = -0.905578

    > Store producer_tile.0(7, 4) = 0.270906

    > Store producer_tile.0(8, 4) = 0.551427

    > Store producer_tile.0(4, 5) = 0.912945

    > Store producer_tile.0(5, 5) = -0.132352

    > Store producer_tile.0(6, 5) = -0.988032

    > Store producer_tile.0(7, 5) = -0.428183

    > Store producer_tile.0(8, 5) = 0.745113

    > Store producer_tile.0(4, 6) = -0.905578

    > Store producer_tile.0(5, 6) = -0.988032

    > Store producer_tile.0(6, 6) = -0.991779

    > Store producer_tile.0(7, 6) = -0.916522

    > Store producer_tile.0(8, 6) = -0.768255

    > Store producer_tile.0(4, 7) = 0.270906

    > Store producer_tile.0(5, 7) = -0.428183

    > Store producer_tile.0(6, 7) = -0.916522

    > Store producer_tile.0(7, 7) = -0.953753

    > Store producer_tile.0(8, 7) = -0.521551

    > Store producer_tile.0(4, 8) = 0.551427

    > Store producer_tile.0(5, 8) = 0.745113

    > Store producer_tile.0(6, 8) = -0.768255

    > Store producer_tile.0(7, 8) = -0.521551

    > Store producer_tile.0(8, 8) = 0.920026

    > Store consumer_tile.0(4, 4) = 0.351409

    > Store consumer_tile.0(5, 4) = -0.278254

    > Store consumer_tile.0(6, 4) = -0.512722

    > Store consumer_tile.0(7, 4) = 0.284816

    > Store consumer_tile.0(4, 5) = -0.278254

    > Store consumer_tile.0(5, 5) = -0.775048

    > Store consumer_tile.0(6, 5) = -0.831129

    > Store consumer_tile.0(7, 5) = -0.341961

    > Store consumer_tile.0(4, 6) = -0.512722

    > Store consumer_tile.0(5, 6) = -0.831129

    > Store consumer_tile.0(6, 6) = -0.944644

    > Store consumer_tile.0(7, 6) = -0.790020

    > Store consumer_tile.0(4, 7) = 0.284816

    > Store consumer_tile.0(5, 7) = -0.341961

    > Store consumer_tile.0(6, 7) = -0.790020

    > Store consumer_tile.0(7, 7) = -0.269207

    > End pipeline consumer_tile.0()

    -------------------------------------------------------------------

            // 可视化图如下.

            图85

            // producer和consumer现在按每个区块交替。 这是等效的C:

            float result[8][8];

            // 对于consumer的每块:

            for (int y_outer = 0; y_outer < 2; y_outer++) {

                for (int x_outer = 0; x_outer < 2; x_outer++) {

                    // 计算此图块开始处的x和y坐标.

                    int x_base = x_outer*4;

                    int y_base = y_outer*4;

                    // 计算足够的producer来满足此图块。 consumer的4x4拼贴需要producer的5x5拼贴。

                    float producer_storage[5][5];

                    for (int py = y_base; py < y_base + 5; py++) {

                        for (int px = x_base; px < x_base + 5; px++) {

                            producer_storage[py-y_base][px-x_base] = sin(px * py);

                        }

                    }

                    // 计算消费者的这个图块

                    for (int y_inner = 0; y_inner < 4; y_inner++) {

                        for (int x_inner = 0; x_inner < 4; x_inner++) {

                            int x = x_base + x_inner;

                            int y = y_base + y_inner;

                            result[y][x] =

                                (producer_storage[y - y_base][x - x_base] +

                                producer_storage[y - y_base + 1][x - x_base] +

                                producer_storage[y - y_base][x - x_base + 1] +

                                producer_storage[y - y_base + 1][x - x_base + 1])/4;

                        }

                    }

                }

            }

            printf("Pseudo-code for the schedule:\n");

            consumer.print_loop_nest();

            printf("\n");

    -------------------------------------------------------------------

    > produce consumer_tile:

    >  for y.y_outer:

    >    for x.x_outer:

    >      produce producer_tile:

    >        for y:

    >          for x:

    >            producer_tile(...) = ...

    >      consume producer_tile:

    >        for y.y_inner in [0, 3]:

    >          for x.x_inner in [0, 3]:

    >            consumer_tile(...) = ...

    ------------------------------------------------------------------

            // 对于像这样的问题,使用在x和y处向外延伸的模具,平铺可能很有意义。 每个图块可以并行独立地计算,并且一旦图块变得足够大,每个图块完成的冗余工作就不会太糟糕。

        }

        // 让我们尝试一种混合策略,该策略将我们已完成的拆分,并行化和向量化处理结合在一起。 对于大型图像,这通常在实践中效果很好。 如果您了解此进程,那么您将了解Halide中95%的进程。

        {

            Func producer("producer_mixed"), consumer("consumer_mixed");

            producer(x, y) = sin(x * y);

            consumer(x, y) = (producer(x, y) +

                              producer(x, y+1) +

                              producer(x+1, y) +

                              producer(x+1, y+1))/4;

            // 将consumer的y坐标分成16条扫描线:

            Var yo, yi;

            consumer.split(y, yo, yi, 16);

            // 使用线程池和任务队列计算strip.

            consumer.parallel(yo);

            // 在x上向量化四倍.

            consumer.vectorize(x, 4);

            // 现在按条存储producer。 这将是producer的17条扫描线(16 + 1),但希望它将折叠成两条扫描线的循环缓冲区:

            producer.store_at(consumer, yo);

            // 在每个strip中,计算consumer每条扫描线的生产者,而跳过先前扫描线完成的工作。

            producer.compute_at(consumer, yi);

            // 还要对producer进行矢量化处理(因为可以使用SSE在x86上对sin进行矢量化处理)。

            producer.vectorize(x, 4);

            // 我们这次不进行跟踪,因为我们将在更大的图像上进行评估。

            // consumer.trace_stores();

            // producer.trace_stores();

            Buffer<float> halide_result = consumer.realize(160, 160);

            // 下图是可视化图

            图86

            // 等效的C代码:

            float c_result[160][160];

            // 对于16条扫描线的每个strip(在Halide版本中此循环是并行的)

            for (int yo = 0; yo < 160/16 + 1; yo++) {

                // 16不会除以160,因此将最后一个片段向上推以适合[0,159](请参阅第05章)。

                int y_base = yo * 16;

                if (y_base > 160-16) y_base = 160-16;

                // 为producer分配两个扫描线循环缓冲区

                float producer_storage[2][161];

                // 对于16 strip中的每条扫描线

                for (int yi = 0; yi < 16; yi++) {

                    int y = y_base + yi;

                    for (int py = y; py < y+2; py++) {

                        // 跳过已经在此任务内计算出的扫描线

                        if (yi > 0 && py == y) continue;

                        // 用4宽度向量计算producer的扫描线

                        for (int x_vec = 0; x_vec < 160/4 + 1; x_vec++) {

                            int x_base = x_vec*4;

                            // 4不会除以161,因此将最后一个向量向左推(请参阅第05章).

                            if (x_base > 161 - 4) x_base = 161 - 4;

                            // 如果您使用的是x86,则Halide会为此部分生成SSE代码:

                            int x[] = {x_base, x_base + 1, x_base + 2, x_base + 3};

                            float vec[4] = {sinf(x[0] * py), sinf(x[1] * py),

                                            sinf(x[2] * py), sinf(x[3] * py)};

                            producer_storage[py & 1][x[0]] = vec[0];

                            producer_storage[py & 1][x[1]] = vec[1];

                            producer_storage[py & 1][x[2]] = vec[2];

                            producer_storage[py & 1][x[3]] = vec[3];

                        }

                    }

                    // 现在计算此扫描线的consumer:

                    for (int x_vec = 0; x_vec < 160/4; x_vec++) {

                        int x_base = x_vec * 4;

                        // 同样,此处的Halide等效项使用SSE.

                        int x[] = {x_base, x_base + 1, x_base + 2, x_base + 3};

                        float vec[] = {

                            (producer_storage[y & 1][x[0]] +

                            producer_storage[(y+1) & 1][x[0]] +

                            producer_storage[y & 1][x[0]+1] +

                            producer_storage[(y+1) & 1][x[0]+1])/4,

                            (producer_storage[y & 1][x[1]] +

                            producer_storage[(y+1) & 1][x[1]] +

                            producer_storage[y & 1][x[1]+1] +

                            producer_storage[(y+1) & 1][x[1]+1])/4,

                            (producer_storage[y & 1][x[2]] +

                            producer_storage[(y+1) & 1][x[2]] +

                            producer_storage[y & 1][x[2]+1] +

                            producer_storage[(y+1) & 1][x[2]+1])/4,

                            (producer_storage[y & 1][x[3]] +

                            producer_storage[(y+1) & 1][x[3]] +

                            producer_storage[y & 1][x[3]+1] +

                            producer_storage[(y+1) & 1][x[3]+1])/4};

                        c_result[y][x[0]] = vec[0];

                        c_result[y][x[1]] = vec[1];

                        c_result[y][x[2]] = vec[2];

                        c_result[y][x[3]] = vec[3];

                    }

                }

            }

            printf("Pseudo-code for the schedule:\n");

            consumer.print_loop_nest();

            printf("\n");

    ----------------------------------------------------------------

    > produce consumer_mixed:

    >  parallel y.yo:

    >    store producer_mixed:

    >      for y.yi in [0, 15]:

    >        produce producer_mixed:

    >          for y:

    >            for x.x:

    >              vectorized x.v39 in [0, 3]:

    >                producer_mixed(...) = ...

    >        consume producer_mixed:

    >          for x.x:

    >            vectorized x.v36 in [0, 3]:

    >              consumer_mixed(...) = ...

    ------------------------------------------------------------------

            // 看我的代码,你们强大而绝望!

            // 让我们对照halide结果检查C结果。 这样做后,我在C实现中发现了几个错误,这些错误应该告诉您一些信息。

            for (int y = 0; y < 160; y++) {

                for (int x = 0; x < 160; x++) {

                    float error = halide_result(x, y) - c_result[y][x];

                    // 这是浮点数学运算,所以我们允许一些误差:

                    if (error < -0.001f || error > 0.001f) {

                        printf("halide_result(%d, %d) = %f instead of %f\n",

                              x, y, halide_result(x, y), c_result[y][x]);

                        return -1;

                    }

                }

            }

        }

        // 这东西很难。 我们最终在内存带宽,冗余工作和并行性之间进行了三步权衡。 halide无法自动为您做出正确的选择(抱歉)。 相反,它试图使您更轻松地探索各种选项,而不会弄乱您的程序。 实际上,Halide承诺,诸如compute_root之类的调度调用不会改变算法的含义-无论如何调度,您都应该获得相同的位。

        // 因此要凭经验! 试用各种时间表,并记录性能。 形成假设,然后尝试证明自己是错误的。 不必假设您只需要将代码矢量化四倍,然后在八个内核上运行它,就会使速度提高32倍。 这几乎是行不通的。 现代系统非常复杂,以至于如果不运行代码就无法可靠地预测性能。

        //我们建议您首先安排所有非平凡阶段的compute_root,然后从流水线末端开始向上,依次内联,并行化和向量化各个阶段,直到到达顶部为止。

        //Halide不仅涉及向量化和并行化代码。 这还不足以使您走得更远。 Halide旨在为您提供工具,以帮助您快速探索局部性,冗余工作和并行性之间的不同权衡,而不会弄乱您要计算的实际结果。

        printf("Success!\n");

        return 0;

    }

    相关文章

      网友评论

          本文标题:halide编程技术指南(连载四)

          本文链接:https://www.haomeiwen.com/subject/rjmjphtx.html