为特殊的任务设计并发代码时,需要根据任务来考虑之前提到的问题。为了展示以上的注意事项如何应用,我们将看一下在C++标准库中三个标准函数的并行实现。
这里主要演示这些技术的实现,不过可能这些技术并不是最前沿的。更多优秀的实现可以更好的利用硬件并发,这可能需要到与并行算法相关的学术文献,或者是多线程的专家库中(比如:Inter的TBB[4])才能看到。
并行版的std::for_each
最能体现并行的概念,就让我们从它开始吧!
std::for_each
的原理很简单:其对某个范围中的元素,依次使用用户提供的函数。并行和串行的最大区别就是函数的调用顺序。std::for_each
是对范围中的第一个元素调用用户函数,接着是第二个,以此类推。在并行实现中对于每个元素的处理顺序就不能保证了。
为了实现这个函数的并行版本,需要对每个线程所处理的元素进行划分。事先知道元素数量,所以可以在处理前对数据进行划分(详见8.1.1节)。假设只有并行任务运行,就可以使用std::thread::hardware_concurrency()
来决定线程的数量。同样,这些元素都能独立的处理,可以使用连续的数据块来避免伪共享(详见8.2.3节)。
这里的算法类似于并行版的std::accumulate
(详见8.4.1节),不过比起计算每一个元素的加和,这里对每个元素仅仅使用了指定功能的函数。因为不需要返回结果,要将异常传递给调用者,就需要使用std::packaged_task
和std::future
对线程中的异常进行转移。
代码8.7 并行版std::for_each
template<typename Iterator,typename Func>
void parallel_for_each(Iterator first,Iterator last,Func f)
{
unsigned long const length=std::distance(first,last);
if(!length)
return;
unsigned long const min_per_thread=25;
unsigned long const max_threads=
(length+min_per_thread-1)/min_per_thread;
unsigned long const hardware_threads=
std::thread::hardware_concurrency();
unsigned long const num_threads=
std::min(hardware_threads!=0?hardware_threads:2,max_threads);
unsigned long const block_size=length/num_threads;
std::vector<std::future<void> > futures(num_threads-1); // 1
std::vector<std::thread> threads(num_threads-1);
join_threads joiner(threads);
Iterator block_start=first;
for(unsigned long i=0;i<(num_threads-1);++i)
{
Iterator block_end=block_start;
std::advance(block_end,block_size);
std::packaged_task<void(void)> task( // 2
[=]()
{
std::for_each(block_start,block_end,f);
});
futures[i]=task.get_future();
threads[i]=std::thread(std::move(task)); // 3
block_start=block_end;
}
std::for_each(block_start,last,f);
for(unsigned long i=0;i<(num_threads-1);++i)
{
futures[i].get(); // 4
}
}
代码结构与代码8.4差不多,不同在于future向量对std::future<void>
类型①变量进行存储,因为工作线程不会返回值,并且Lambda函数会对block_start到block_end上的任务②执行f函数(为了避免传入线程的构造函数③)。工作线程不需要返回值时,调用futures[i].get()④只为了检索工作线程的异常。如果不想把异常传递出去,可以省略这一步。
实现并行std::accumulate
时,使用std::async
会简化代码。同样,parallel_for_each也可以使用std::async
。
代码8.8 使用std::async
实现std::for_each
template<typename Iterator,typename Func>
void parallel_for_each(Iterator first,Iterator last,Func f)
{
unsigned long const length=std::distance(first,last);
if(!length)
return;
unsigned long const min_per_thread=25;
if(length<(2*min_per_thread))
{
std::for_each(first,last,f); // 1
}
else
{
Iterator const mid_point=first+length/2;
std::future<void> first_half= // 2
std::async(¶llel_for_each<Iterator,Func>,
first,mid_point,f);
parallel_for_each(mid_point,last,f); // 3
first_half.get(); // 4
}
}
和基于std::async
的parallel_accumulate(代码8.5)一样,因为不知道需要使用多少个线程,所以在运行时对数据进行迭代划分。像之前一样,将每一级的数据分成两部分,异步执行另外一部分②,剩下的部分就不能再进行划分了,所以直接运行这一部分③;这样就可以直接对std::for_each
①进行使用了。再次使用std::async
和std::future
的get()④对异常进行传播。
回到算法,函数需要对每一个元素执行同样的操作(这样的操作有很多种,初学者可能会想到std::count
和std::replace
),一个稍微复杂一些的例子就是使用std::find
。
接下来是std::find
,因为不需要对数据元素做处理,所以当第一个元素就满足查找标准时,就没有必要对剩余元素进行搜索了。所以,算法属性对于性能具有很大的影响,并且对并行实现有着直接的影响。这个算法是个很特别的例子,数据访问模式都会对代码的设计产生影响(详见8.3.2节)。该类中的另一些算法包括std::equal
和std::any_of
。
当你和妻子或搭档,在一个纪念盒中找寻一张老照片,当找到这张照片时,就不会再看另外的照片了。不过,你需要让其他人知道你已经找到照片了(比如,大喊一声“找到了!”),这样其他人就会停止搜索了。很多算法的特性就是要对每一个元素进行处理,所以没有办法像std::find
一样,一旦找到合适数据就停止执行。因此,需要设计代码对其进行使用——当得到想要的答案时,就中断其他任务的执行。
如果不中断其他线程,串行版本的性能可能会超越并行版,因为串行算法可以在找到匹配元素的时候停止搜索。如果系统能支持四个并发线程,那么每个线程就可以对总数据量的1/4进行检查,并且在我们的实现只需要单核完成的1/4的时间,就能完成对所有元素的查找。如果匹配的元素在第一个1/4块中,串行算法将会返回第一个。
中断其他线程的一个办法就是使用原子变量作为标识,处理过每一个元素后就对这个标识进行检查。如果标识设置了,就意味有线程找到了匹配元素,算法就可以停止并返回。用这种方式来中断线程就可以将没有处理的数据保持原样,并在更多的情况下,比串行运行性能上能提升很多。缺点就是,加载原子变量是一个很慢的操作,会阻碍线程的运行。
如何获取返回值和异常呢?有两个选择。可以使用一个future数组,使用std::packaged_task
来转移值和异常,主线程上对返回值和异常进行处理,或者使用std::promise
对工作线程上的最终结果直接进行设置,这完全依赖于怎么样处理工作线程上的异常。如果想停止第一个异常(即使还没有对所有元素进行处理),就可以使用std::promise
对异常和最终值进行设置。另外,如果想让其他工作线程继续查找,可以使用std::packaged_task
来存储异常,当线程没有找到匹配元素时异常将再次抛出。
这种情况下,我会选择std::promise
,因为其行为和std::find
更为接近。这里需要注意一下搜索的元素是不是在提供的搜索范围内。因此,在所有线程结束前,获取future上的结果。如果所要查找的值不在范围内,就会持续的等待下去。
代码8.9 并行find算法实现
template<typename Iterator,typename MatchType>
Iterator parallel_find(Iterator first,Iterator last,MatchType match)
{
struct find_element // 1
{
void operator()(Iterator begin,Iterator end,
MatchType match,
std::promise<Iterator>* result,
std::atomic<bool>* done_flag)
{
try
{
for(;(begin!=end) && !done_flag->load();++begin) // 2
{
if(*begin==match)
{
result->set_value(begin); // 3
done_flag->store(true); // 4
return;
}
}
}
catch(...) // 5
{
try
{
result->set_exception(std::current_exception()); // 6
done_flag->store(true);
}
catch(...) // 7
{}
}
}
};
unsigned long const length=std::distance(first,last);
if(!length)
return last;
unsigned long const min_per_thread=25;
unsigned long const max_threads=
(length+min_per_thread-1)/min_per_thread;
unsigned long const hardware_threads=
std::thread::hardware_concurrency();
unsigned long const num_threads=
std::min(hardware_threads!=0?hardware_threads:2,max_threads);
unsigned long const block_size=length/num_threads;
std::promise<Iterator> result; // 8
std::atomic<bool> done_flag(false); // 9
std::vector<std::thread> threads(num_threads-1);
{ // 10
join_threads joiner(threads);
Iterator block_start=first;
for(unsigned long i=0;i<(num_threads-1);++i)
{
Iterator block_end=block_start;
std::advance(block_end,block_size);
threads[i]=std::thread(find_element(), // 11
block_start,block_end,match,
&result,&done_flag);
block_start=block_end;
}
find_element()(block_start,last,match,&result,&done_flag); // 12
}
if(!done_flag.load()) //13
{
return last;
}
return result.get_future().get(); // 14
}
代码8.9中的函数体与之前的例子相似。这次,由find_element类①的函数调用操作实现,循环通过在给定数据块中的元素,检查每一步的标识②。如果匹配的元素被找到,就将最终的结果设置到promise③当中,并且在返回前对done_flag④进行设置。
如果有异常抛出,通用处理代码⑤就会将其捕获,并且在promise⑥尝试存储前对done_flag进行设置。如果对应promise已设置,可能会抛出一个异常,所以这里⑦发生的任何异常都可以捕获并丢弃。
当线程调用find_element查询一个值,或者抛出一个异常时,如果其他线程看到done_flag已设置,其他线程会终止。如果多线程同时找到匹配值或抛出异常,将会产生竞争。不过,这是良性的条件竞争,成功的竞争者会作为“第一个”返回线程。
回到parallel_find函数本身,其拥有用来停止搜索的promise⑧和标识⑨。随着对元素的查找⑪,promise和标识会传递到新线程中。主线程也使用find_element对剩下的元素进行查找⑫。像之前提到的,需要在全部线程结束前,对结果进行检查,因为结果可能是任意位置上的匹配元素。将“启动-汇入”代码放在一个块中⑩,所有线程都会在找到匹配元素时⑬进行汇入。如果找到匹配元素,就可以调用std::future<Iterator>
(来自promise⑭)的成员函数get()来获取返回值或异常。
不过,假设使用硬件上所有可用的的线程,或使用其他机制对线程上的任务进行提前划分,可以使用std::async
,以及递归数据划分的方式来简化实现(同时使用C++标准库中提供的自动缩放工具)。使用std::async
的parallel_find实现如下所示。
代码8.10 使用std::async
实现的并行find算法
template<typename Iterator,typename MatchType> // 1
Iterator parallel_find_impl(Iterator first,Iterator last,MatchType match,
std::atomic<bool>& done)
{
try
{
unsigned long const length=std::distance(first,last);
unsigned long const min_per_thread=25; // 2
if(length<(2*min_per_thread)) // 3
{
for(;(first!=last) && !done.load();++first) // 4
{
if(*first==match)
{
done=true; // 5
return first;
}
}
return last; // 6
}
else
{
Iterator const mid_point=first+(length/2); // 7
std::future<Iterator> async_result=
std::async(¶llel_find_impl<Iterator,MatchType>, // 8
mid_point,last,match,std::ref(done));
Iterator const direct_result=
parallel_find_impl(first,mid_point,match,done); // 9
return (direct_result==mid_point)?
async_result.get():direct_result; // 10
}
}
catch(...)
{
done=true; // 11
throw;
}
}
template<typename Iterator,typename MatchType>
Iterator parallel_find(Iterator first,Iterator last,MatchType match)
{
std::atomic<bool> done(false);
return parallel_find_impl(first,last,match,done); // 12
}
如果想要在找到匹配项时结束,就需要在线程间设置标识,来表明匹配项已找到。因此,需要将这个标识递归传递。通过函数①的方式来实现,只需要增加一个参数done标识的引用,用来表示通过主入口点传入⑫。
核心实现和之前的代码一样。函数的实现中,会让单个线程处理最少的数据项②。如果数据块大小不足以分成两半,就要让当前线程完成所有的工作③。实际算法在一个简单的循环中,直到在循环到指定范围中的最后一个,或找到匹配项,并对标识进行设置④。如果找到匹配项,标识done就会在返回前进行设置⑤。无论是因为已经查找到最后一个,还是因为其他线程对done进行了设置,查找都会停止。如果没有找到,会将最后一个元素进行返回⑥。
如果给定范围可以进行划分,首先要在st::async
在对第二部分进行查找⑧前,要找数据中点⑦,而且需要使用std::ref
将done以引用的方式传递。同时,可以通过对第一部分直接进行递归查找。两部分都为异步,并且在原始范围过大时,直接递归查找的部分可能会再细化。
如果查找返回的是mid_point,这就意味着没有找到匹配项,所以就要从异步查找中获取结果。如果在另一半中没有匹配项的话,返回的结果就一定是last,这个值的返回就代表了没有找到匹配的元素⑩。如果“异步”调用延迟(非真正的异步),实际上这里会运行get()。这种情况下,如果对下半部分的元素搜索成功,就不会执行对上半部分元素的搜索了。如果异步查找真实的运行在其他线程上,那么async_result的析构函数将会等待该线程完成,所以这里不会有线程泄露。
std::async
可以用来提供“异常-安全”和“异常-传播”特性。如果直接递归抛出异常,future的析构函数就能让异步执行的线程提前结束。如果异步调用抛出异常,这个异常将会通过对get()成员函数的调用进行传播⑩。使用try/catch块只能捕捉在done时发生的异常,并且有异常抛出⑪时,所有线程都能很快的终止。不过,不使用try/catch的实现依旧没问题,就是要等待所有线程的工作完成。
实现中一个重要的特性,就是不能保证std::find
能串行处理所有数据。其他并行算法可以借鉴这个特性,因为让一个算法并行起来这是必须的特性。如果有顺序问题,元素就不能并发了。如果每个元素独立,虽然对于parallel_for_each不是很重要,不过对于parallel_find,即使在开始部分已经找到了匹配元素,也有可能返回范围中最后一个元素。
OK,现在已经使用了并行化的std::find
。如在本节开始说的那样,其他相似算法不需要对每一个数据元素进行处理,并且同样的技术可以使用到这些类似的算法上去。
为了完成并行“三重奏”,我们将换一个角度来看一下std::partial_sum
。对于这个算法,没有太多的文献可参考,不过让这个算法并行起来是一件很有趣的事。
std::partial_sum
会计算给定范围中的每个元素,并用计算后的结果将原始序列中的值替换掉。比如,有一个序列[1,2,3,4,5],执行该算法后会成为:[1,3(1+2),6(1+2+3),10(1+2+3+4),15(1+2+3+4+5)]。让这样一个算法并行起来会很有趣,因为这里不能讲任务分块,对每一块进行独立的计算。比如,原始序列中的第一个元素需要加到后面的一个元素中去。
确定某个范围部分和的一种的方式,就是在独立块中计算部分和,然后将第一块中最后的元素的值,与下一块中的所有元素进行相加,依次类推。如果有个序列[1,2,3,4,5,6,7,8,9],然后将其分为三块,那么在第一次计算后就能得到[{1,3,6},{4,9,15},{7,15,24}]。然后将6(第一块的最后一个元素)加到第二个块中,那么就得到[{1,3,6},{10,15,21},{7,15,24}]。然后再将第二块的最后一个元素21加到第三块中去,就得到[{1,3,6},{10,15,21},{28,36,55}]。
将原始数据分割成块,加上之前块的部分和就能够并行了。如果每个块中的末尾元素都是第一个被更新的,那么块中其他的元素就能被其他线程所更新,同时另一个线程对下一块进行更新,等等。当处理的元素比处理核心的个数多的时候,这样完成工作没问题,因为每一个核芯在每一个阶段都有合适的数据可以进行处理。
如果有很多的处理器(比处理的元素个数多),之前的方式就无法正常工作了。如果还是将工作划分给每个处理器,第一步就没必要去做了。这种情况下,传递结果就意味着让处理器进行等待,这时需要给这些处于等待中的处理器一些工作。所以,可以采用完全不同的方式来处理这个问题。比起将数据块中的最后一个元素的结果向后面的元素块传递,可以对部分结果进行传播:第一次与相邻的元素(距离为1)相加和(和之前一样),之后和距离为2的元素相加,后来和距离为4的元素相加,以此类推。比如:初始序列为[1,2,3,4,5,6,7,8,9],第一次后为[1,3,5,7,9,11,13,15,17],第二次后为[1,3,6,10,14,18, 22,26,30],下一次就要隔4个元素了。第三次后[1, 3, 6, 10, 15, 21, 28, 36, 44],下一次就要隔8个元素了。第四次后[1, 3, 6, 10, 15, 21, 28, 36, 45],这就是最终的结果。比起第一种方法多了很多步骤,不过在可并发平台下,这种方法提高了并行的可行性,每个处理器可在每一步中处理一个数据项。
总体来说,当有N个操作时(每步使用一个处理器)第二种方法需要log(N)[底为2]步。在本节中,N就相当于数据链表的长度。比起第一种,每个线程对分配块做N/k个操作,然后在做N/k次结果传递(这里的k是线程的数量)。因此,第一种方法的时间复杂度为O(N),不过第二种方法的时间复杂度为Q(Nlog(N))。当数据量和处理器数量相近时,第二种方法需要每个处理器上log(N)个操作,因为需要对结果进行传递,第一种方法中每个处理器上执行的操作数会随着k的增加而增多。对于处理单元较少的情况,第一种方法会比较合适。对于大规模并行系统,第二种方法比较合适。
不管怎么样,先将效率问题放一边,让我们来看一些代码。下面的代码实现的,就是第一种方法。
代码8.11 使用划分的方式进行并行求和
template<typename Iterator>
void parallel_partial_sum(Iterator first,Iterator last)
{
typedef typename Iterator::value_type value_type;
struct process_chunk // 1
{
void operator()(Iterator begin,Iterator last,
std::future<value_type>* previous_end_value,
std::promise<value_type>* end_value)
{
try
{
Iterator end=last;
++end;
std::partial_sum(begin,end,begin); // 2
if(previous_end_value) // 3
{
value_type& addend=previous_end_value->get(); // 4
*last+=addend; // 5
if(end_value)
{
end_value->set_value(*last); // 6
}
std::for_each(begin,last,[addend](value_type& item) // 7
{
item+=addend;
});
}
else if(end_value)
{
end_value->set_value(*last); // 8
}
}
catch(...) // 9
{
if(end_value)
{
end_value->set_exception(std::current_exception()); // 10
}
else
{
throw; // 11
}
}
}
};
unsigned long const length=std::distance(first,last);
if(!length)
return last;
unsigned long const min_per_thread=25; // 12
unsigned long const max_threads=
(length+min_per_thread-1)/min_per_thread;
unsigned long const hardware_threads=
std::thread::hardware_concurrency();
unsigned long const num_threads=
std::min(hardware_threads!=0?hardware_threads:2,max_threads);
unsigned long const block_size=length/num_threads;
typedef typename Iterator::value_type value_type;
std::vector<std::thread> threads(num_threads-1); // 13
std::vector<std::promise<value_type> >
end_values(num_threads-1); // 14
std::vector<std::future<value_type> >
previous_end_values; // 15
previous_end_values.reserve(num_threads-1); // 16
join_threads joiner(threads);
Iterator block_start=first;
for(unsigned long i=0;i<(num_threads-1);++i)
{
Iterator block_last=block_start;
std::advance(block_last,block_size-1); // 17
threads[i]=std::thread(process_chunk(), // 18
block_start,block_last,
(i!=0)?&previous_end_values[i-1]:0,
&end_values[i]);
block_start=block_last;
++block_start; // 19
previous_end_values.push_back(end_values[i].get_future()); // 20
}
Iterator final_element=block_start;
std::advance(final_element,std::distance(block_start,last)-1); // 21
process_chunk()(block_start,final_element, // 22
(num_threads>1)?&previous_end_values.back():0,
0);
}
这个实现中使用的结构体和之前一样,将问题进行分块解决,每个线程处理最小的数据块⑫。其中,有一组线程⑬和一组promise⑭用来存储每块中的最后一个值,并且实现中还有一组future⑮,用来对前一块中的最后一个值进行检索。可以为future⑯进行存储,以避免生成新线程时再分配。
主循环和之前一样,不过这次是让迭代器指向了每个数据块的最后一个元素,而不是作为普通值传递到最后⑰,这样就方便向其他块传递当前块的最后一个元素了。实际处理是在process_chunk函数对象中完成,这个结构体看上去不是很长。当前块的开始和结束迭代器和前块中最后一个值的future一起,作为参数进行传递,并且promise用来保留当前范围内最后一个值的原始值⑱。
生成新线程后,就对开始块ID进行更新,别忘了传递最后一个元素⑲,并且将当前块的最后一个元素存储到future,上面的数据将在循环中再次使用到⑳。
处理最后一个数据块前,需要获取之前数据块中最后一个元素的迭代器(21),这样就可以将其作为参数传入process_chunk(22)中了。std::partial_sum
不会返回一个值,所以在最后一个数据块被处理后,就不用再做任何事情了。当所有线程的操作完成时,求部分和的操作也就算完成了。
来看一下process_chunk函数对象①。对于整块的处理始于对std::partial_sum
的调用,包括对于最后一个值的处理②,需要了解当前块是否是第一块③。如果不是第一块,就会有previous_end_value值从前面的块传过来,所以需要等待这个值的产生④。为了将算法的并行最大化,需要对最后一个元素进行更新⑤,这就能将值传递给下一个数据块(如果有下一个数据块的话)⑥。当完成这个操作,就可以使用std::for_each
和简单的Lambda函数⑦对剩余的数据项进行更新。
如果previous_end_value值为空,当前数据块就是第一个数据块,所以只需要为下一个数据块更新end_value⑧。
最后,如果有操作抛出异常可以将其捕获⑨,并且存入promise⑩,在下一个数据块尝试获取前一个数据块的最后一个值④时,异常会再次抛出。处理最后一个数据块时,异常会重新抛出⑪,因为抛出动作会在主线程上进行。
因为线程间需要同步,这里就不容易使用std::async
重写了。任务等待会让线程中途去执行其他任务,所以所有的任务必须同时执行。
基于块的方法就介绍到这里,让我们来看一下第二种计算方式。
实现以2的幂级数为距离的算法
第二种算法通过增加距离的方式,让更多的处理器充分发挥作用。这种情况下,没有同步的必要了,因为所有中间结果都直接传递到下一个处理器上去了。实际中我们很少见到单个处理器处理对一定数量的元素执行同一条指令,这种方式称为单指令-多数据流(SIMD)。因此,代码必须能处理通用情况,并且需要对线程进行显式同步。
完成这种功能的一种方式是使用栅栏(barrier):只有所有线程都到达栅栏处,才能进行之后的操作,先到达的线程必须等待未到达的线程。C++11标准库没有直接提供这样的工具,所以需要自行设计。
试想游乐场中的过山车。如果有适量的游客在等待,过山车管理员就要保证,在过山车启动前,每一个位置都得坐一个游客。栅栏的工作原理也一样:已知了“座位”的数量,线程就是要等待所有“座位”都坐满。当等待线程够数,它们可以继续运行。这时,栅栏会重置,并且会让下一拨线程等待。通常会在循环中这样做,当同一个线程再次到达栅栏处,它会再次等待。这种方法是为了让线程同步,所以不会有线程在其他未完成的情况下,去完成下一个任务。如果有线程提前执行,这样的算法就是一场灾难,因为提前出发的线程可能会修改要被其他线程使用到的数据,后面线程获取到的数据就不正确了。
下面的代码就简单的实现了一个栅栏。
代码8.12 简单的栅栏类
class barrier
{
unsigned const count;
std::atomic<unsigned> spaces;
std::atomic<unsigned> generation;
public:
explicit barrier(unsigned count_): // 1
count(count_),spaces(count),generation(0)
{}
void wait()
{
unsigned const my_generation=generation; // 2
if(!--spaces) // 3
{
spaces=count; // 4
++generation; // 5
}
else
{
while(generation==my_generation) // 6
std::this_thread::yield(); // 7
}
}
};
这个实现中,用一定数量的“座位”构造了barrier①,这个数量将会存储count变量中。起初,栅栏中的spaces与count数量相当。当有线程都在等待时,spaces的数量就会减少③。当spaces的数量减到0时,spaces的值将会重置为count④,并且generation变量会增加,向线程发出信号,让这些等待线程能够继续运行⑤。如果spaces没有到达0,则线程会继续等待。这个实现使用了简单的自旋锁⑥,对generation的检查会在wait()开始时进行②。因为generation只会在所有线程都到达栅栏的时候更新⑤,所以在等待的时候使用yield()⑦就不会让CPU处于忙等待。
这个实现比较“简单”:使用自旋等待的情况下,如果让线程等待很长时间就不会很理想,并且如果超过count数量的线程对wait()进行调用,这个实现就没有办法工作了。如果想要很好的处理这样的情况,必须使用一个更加健壮(更加复杂)的实现。我依旧坚持对原子变量操作顺序的一致性,因为这会让事情更加简单,有时还是需要放松这样的约束。全局同步对于大规模并行架构来说消耗巨大,因为相关处理器会穿梭于存储栅栏状态的缓存行中(可见8.2.2中对乒乓缓存的讨论),所以需要格外小心,来确保使用的是最佳同步方法。如果支持并发技术规范扩展,这里就可以使用std::experimental::barrier
(如第4章所述)。
不论怎么样,都需要有固定数量的线程执行同步循环,大多数情况下线程数量都是固定的。代码起始部分的几个数据项,只需要几步就能得到其最终值。这就意味着,无论是让所有线程循环处理范围内的所有元素,还是让栅栏来同步线程,都会递减count的值。我会选择后者(栅栏),能避免线程做不必要的工作,并且仅仅需要等待最终步骤完成。
要将count改为一个原子变量,这样在多线程对其进行更新的时候,就不需要添加额外的同步:
std::atomic<unsigned> count;
初始化保持不变,不过当spaces的值被重置后,需要显式的对count进行load()操作:
spaces=count.load();
这就是要对wait()函数的改动。现在需要一个新的成员函数来递减count,这个函数命名为done_waiting(),当一个线程完成其工作并在等待时才能调用它:
void done_waiting()
{
--count; // 1
if(!--spaces) // 2
{
spaces=count.load(); // 3
++generation;
}
}
实现中,首先要减少count①,所以spaces下一次将会置为一个较小的数。然后,需要递减spaces的值②。如果不做这些操作,有些线程将会持续等待,因为spaces被旧的count初始化,从而大于期望值。一组当中最后一个线程需要对计数器进行重置,并且递增generation的值③,就像在wait()里面做的那样。最重要的区别:最后一个线程不需要等待。当最后一个线程结束,整个等待也就随之结束!
现在就准备开始写部分和的第二个实现吧。每一步中的每一个线程都在栅栏处调用wait(),从而保证线程所处步骤的一致性,并且当所有线程都结束,最后一个线程会调用done_waiting()来减少count的值。如果使用两个缓存对原始数据进行保存,栅栏也可以提供所需要的同步。每一步中线程都会从原始数据或缓存中读取数据,并且将新值写入对应位置。如果线程先从原始数据处获取数据,下一步就从缓存上获取数据(或相反)。这就能保证读与写都是由独立线程完成,并不存在条件竞争。当线程结束等待循环,就能保证正确的值写入到原始数据当中。
代码8.13 通过两两更新的方式实现partial_sum
struct barrier
{
std::atomic<unsigned> count;
std::atomic<unsigned> spaces;
std::atomic<unsigned> generation;
barrier(unsigned count_):
count(count_),spaces(count_),generation(0)
{}
void wait()
{
unsigned const gen=generation.load();
if(!--spaces)
{
spaces=count.load();
++generation;
}
else
{
while(generation.load()==gen)
{
std::this_thread::yield();
}
}
}
void done_waiting()
{
--count;
if(!--spaces)
{
spaces=count.load();
++generation;
}
}
};
template<typename Iterator>
void parallel_partial_sum(Iterator first,Iterator last)
{
typedef typename Iterator::value_type value_type;
struct process_element // 1
{
void operator()(Iterator first,Iterator last,
std::vector<value_type>& buffer,
unsigned i,barrier& b)
{
value_type& ith_element=*(first+i);
bool update_source=false;
for(unsigned step=0,stride=1;stride<=i;++step,stride*=2)
{
value_type const& source=(step%2)? // 2
buffer[i]:ith_element;
value_type& dest=(step%2)?
ith_element:buffer[i];
value_type const& addend=(step%2)? // 3
buffer[i-stride]:*(first+i-stride);
dest=source+addend; // 4
update_source=!(step%2);
b.wait(); // 5
}
if(update_source) // 6
{
ith_element=buffer[i];
}
b.done_waiting(); // 7
}
};
unsigned long const length=std::distance(first,last);
if(length<=1)
return;
std::vector<value_type> buffer(length);
barrier b(length);
std::vector<std::thread> threads(length-1); // 8
join_threads joiner(threads);
Iterator block_start=first;
for(unsigned long i=0;i<(length-1);++i)
{
threads[i]=std::thread(process_element(),first,last, // 9
std::ref(buffer),i,std::ref(b));
}
process_element()(first,last,buffer,length-1,b); // 10
}
代码的整体结构应该不用说了。process_element类有函数调用操作可以用来做具体的工作①,就是运行一组线程⑨,并将线程存储到vector中⑧,同样还需要在主线程中对其进行调用⑩。这里与之前最大的区别就是,线程的数量是根据列表中的数据量来定的,而不是根据std::thread::hardware_concurrency
。如之前所说,除非使用的是一个大规模并行的机器,因为这上面的线程都十分廉价(虽然这样的方式并不是很好),还能为我们展示了其整体结构。这个结构在有较少线程的时候,每一个线程只能处理源数据中的部分数据。
不管怎样,主要的工作都是调用process_element的函数操作符来完成的。每一步,都会从原始数据或缓存中获取第i个元素②,并且将获取到的元素加到指定stride的元素中去③,如果从原始数据开始读取的元素,加和后的数需要存储在缓存中④,在开始下一步前会在栅栏处等待⑤。当stride超出了给定数据的范围,最终结果已经存在缓存中时,就需要更新原始数据中的数据,同样这也意味着本次加和结束。最后,再调用栅栏中的done_waiting()函数⑦。
注意这个解决方案并不是异常安全的。如果某个线程在process_element执行时抛出一个异常,就会终止整个应用。可以使用一个std::promise
来存储异常,就像在代码8.9中parallel_find的实现,或仅使用互斥量保护的std::exception_ptr
即可。
总结下这三个例子。希望其能让我们了解8.1、8.2、8.3和8.4节中提到的设计考量,并且证明了这些技术在真实的代码中需要承担哪些责任。