一道电面题目, 分为两问:
- 设计一个系统, 不断接收数据包(数据内容可以简单想成一个int值). 给定常量M, 要求从所有获取的数据中随机抽样M个, 每个样本被抽取的概率相等.
- 如果已接收数据包的数量还未超过M个, 则将它们全部返回.
- 接收数据包的总量是未知的, 可能非常大.
- 机器的存储空间有限, 无法存储所有数据包. 但是存储M个数据包还是绰绰有余的.
- 假如有K台机器, 如何将第一问的算法做成分布式的, 以最大化吞吐量?
第一问是一个标准的水塘抽样算法(Reservoir Sampling)问题.
class System {
private:
vector<int> v;
int capacity;
int cnt;
public:
System(int capacity) : capacity(capacity), cnt(0) {}
void put(int n) {
++cnt;
if (cnt <= capacity) {
v.push_back(n);
return;
}
int index = rand() % cnt;
if (index < capacity) {
v[index] = n;
}
}
vector<int> get() {
return v;
}
};
算法思路:
维护一个大小为M
的数组. 记当前接收的是第N
个数据(从1
开始).
- 如果
N<=M
, 直接插入 - 如果
N>M
, 就取一个1~N
之间的随机数index
. 如果index
在1~M
之间, 则用新接收的数据替换第index
个数据; 否则丢弃.
证明:
假设当前是第M+1
个元素, 它被丢弃的概率是1/(M+1)
, 留下的概率就是M/(M+1)
. 对于已经在集合中的M
个元素, 每个以1/(M+1)
的概率被丢弃, 留下的概率也是M/(M+1)
.
假设当前是第M+2
个元素, 它被丢弃的概率是2/(M+2)
, 留下的概率是M/(M+2)
. 对于前M+1
个元素, 它们在集合中的概率是M/(M+1)
(见上一个分析). 这一次, 它们每个被以1/(M+2)
的概率被丢弃, 留下的概率就是 M/(M+1) * (M+1)/(M+2) = M/(M+2)
依次类推, 到接受第N
个元素时, 每个元素被抽取的概率就是M/N
.
第二问就是分布式的蓄水池抽样问题了.
算法思路是:
假设有K
个机器, 每个机器维护大小为M
的数组, 并记录该机器接受的数据总数Ni
.
- 当机器获取新数据时, 进行单机的蓄水池抽样.
- 当进行采样时, 重复
M
次以下操作:
取随机数d
在[0,1)
之间, 记N=Sum(Ni | i=1...K)
- 若
d<N1/N
则从第一个机器上等概率抽取一个元素. - 若
N1/N<=d<(N1+N2)/N
则从第二个机器上等概率抽取一个元素 - 依此类推.
- 若
假设Ni>M
:
因为第i
个机器上数据的留存概率为M/Ni
, 而采样时又以Ni/N
的概率抽取该机器, 又以1/M
的概率等概率不放回地选取一个元素, 所以第i
个机器上一个数据被抽中的概率为M/Ni * Ni/N * 1/M = 1/N
. 这样重复M
次, 每个元素被抽取到的概率就是M/N
.
假设Ni<=M
第i
个机器上数据的留存概率为1
, 采样时以Ni/N
的概率抽取该机器, 又以1/Ni
的概率等概率不放回地选取一个元素, 所以第i
个机器上一个数据被抽中的概率为1/N
. 同样, 重复M
次让每个元素被抽取到的概率为M/N
.
网友评论