摘抄自 https://www.xuebuyuan.com/2179173.html 学步园
PACKET_MMAP实现原理分析
2014年10月06日 ⁄ 综合 ⁄ 共 4737字 ⁄ 字号 小 中 大 ⁄ 评论关闭
PACKET_MMAP实现原理分析
samonr4l | 更新时间:2011-08-11 16:56:32 | 点击数:155自动刷新
PACKET_MMAP实现的代码都在net/packet/af_packet.c中,其中一些宏、结构等定义在include/linux/if_packet.h中。
PACKET_MMAP的实现原理
PACKET_MMAP在内核空间中分配一块内核缓冲区,然后用户空间程序调用mmap映射到用户空间。将接收到的skb拷贝到那块内核缓冲区中,这样用户空间的程序就可以直接读到捕捉的数据包了。
假如没有开启PACKET_MMAP,只是依靠AF_PACKET非常的低效。它有缓冲区的限制,并且每捕捉一个报文就需要一个系统调用,假如为了获得packet的时间戳就需要两个系统调用了(获得时间戳还需要一个系统调用,libpcap就是这样做的)。
PACKET_MMAP非常高效,它提供一个映射到用户空间的大小可配置的环形缓冲区。这种方式,读取报文只需要等待报文就可以了,大部分情况下不需要系统调用(实在poll也是一次系统调用)。通过内核空间和用户空间共享的缓冲区还可以起到减少数据拷贝的作用。
当然为了进步捕捉的性能,不仅仅只是PACKET_MMAP。假如你在捕捉一个高速网络中的数据,你应该检查NIC是否支持一些中断负载缓和机制或者是NAPI,确定开启这些措施。
PACKET_MMAP减少了系统调用,不用recvmsg就可以读取到捕捉的报文,相比原始套接字+recvfrom的方式,减少了一次拷贝和一次系统调用。
[setup]:
socket()------> 捕捉socket的创建 setsockopt()------> 环形缓冲区的分配 mmap()------> 将分配的缓冲区映射到用户空间中
[capture]
poll()------> 等待新进的报文
[shutdown]
close------> 销毁捕捉socket和所有相关的资源
接下来的这些内容,翻译自Document/networking/packet_mmap.txt,但是根据需要有所删减
假如mode设置为SOCK_RAW,链路层信息也会被捕捉;假如mode设置为SOCK_DGRAM,那么对应接口的链路层信息捕捉就不会被支持,内核会提供一个虚假的头部。
销毁socket和开释相关的资源,可以直接调用一个简单的close()系统调用就可以了。
struct tpacket_req
{
unsigned int tp_block_size; /* Minimal size of contiguous block */
unsigned int tp_block_nr; /* Number of blocks */
unsigned int tp_frame_size; /* Size of frame */
unsigned int tp_frame_nr; /* Total number of frames */
};
这个结构被定义在include/linux/if_packet.h中,在捕捉进程中建立一个不可交换(unswappable)内存的环形缓冲区。通过被映射的内存,捕捉进程就可以无需系统调用就可以访问到捕捉的报文和报文相关的元信息,像时间戳等。
捕捉frame被划分为多个block,每个block是一块物理上连续的内存区域,有tp_block_size/tp_frame_size个frame。block的总数是tp_block_nr。实在tp_frame_nr是多余的,由于我们可以计算出来:
每个frame必须放在一个block中,每个block保存整数个frame,也就是说一个frame不能跨越两个block。
- 映射和使用环形缓冲区
在用户空间映射缓冲区可以直接使用方便的mmap()函数。固然那些buffer在内核中是由多个block组成的,但是映射后它们在用户空间中是连续的。
假如tp_frame_size能够整除tp_block_size,那么每个frame都将会是tp_frame_size长度;假如不是,那么tp_block_size/tp_frame_size个frame之间就会有空隙,那是由于一个frame不会跨越两个block。
这里我们只关心前两个,TP_STATUS_KERNEL和TP_STATUS_USER。假如status为TP_STATUS_KERNEL,表示这个frame可以被kernel使用,实际上就是可以将存放捕捉的数据存放在这个frame中;假如status为TP_STATUS_USER,表示这个frame可以被用户空间使用,实际上就是这个frame中存放的是捕捉的数据,应该读出来。
内核将所有的frame的status初始化为TP_STATUS_KERNEL,当内核接受到一个报文的时候,就选一个frame,把报文放进往,然后更新它的状态为TP_STATUS_USER(这里假设不出现其他题目,也就是忽略其他的状态)。用户程序读取报文,一旦报文被读取,用户必须将frame对应的status设置为0,也就是设置为TP_STATUS_KERNEL,这样内核就可以再次使用这个frame了。
先检查状态值,然后再对frame进行轮循,这样就可以避免竞争条件了(假如status已经是TP_STATUS_USER了,也就是说在调用poll前已经有了一个报文到达。这个时候再调用poll,并且之后不再有新报文到达的话,那么之前的那个报文就无法读取了,这就是所谓的竞争条件)。
在libpcap-1.0.0中是这么设计的:
pcap-linux.c中的pcap_read_linux_mmap:
//假如frame的状态在poll前已经为TP_STATUS_USER了,说明已经在poll前已经有一个数据包被捕捉了,假如poll后不再有数据包被捕捉,那么这个报文不会被处理,这就是所谓的竞争情况。
if ((handle->md.timeout >= 0) && !pcap_get_ring_frame(handle, TP_STATUS_USER)) { struct pollfd pollinfo; int ret; pollinfo.fd = handle->fd; pollinfo.events = POLLIN; do { /* poll() requires a negative timeout to wait forever */ ret = poll(&pollinfo, 1, (handle->md.timeout
0)? handle->md.timeout: -1); if ((ret < 0) && (errno != EINTR)) { return -1; } ...... } while (ret < 0); }
//依次处理捕捉的报文
while ((pkts < max_packets) || (max_packets <= 0)) { ...... //假如frame的状态为TP_STATUS_USER就读出数据frame,否则就退出循环。留意这里是环形缓冲区 h.raw = pcap_get_ring_frame(handle, TP_STATUS_USER); if (!h.raw) break; ...... /* pass the packet to the user / pkts++; callback(user, &pcaphdr,
bp); handle->md.packets_read++; skip: / next packet */ switch (handle->md.tp_version) { case TPACKET_V1:
//重新设置frame的状态为TP_STATUS_KERNEL h.h1->tp_status = TP_STATUS_KERNEL;break; ...... } }
PACKET_MMAP源码分析
这里就不再像上一篇文章中那样大段大段的粘贴代码了,只是分析一下流程就可以了,需要的同学可以对照着follow一下代码;-)
数据包进进网卡后,创建了skb,之后会进进软中断处理,调用netif_receive_skb,并调用dev_add_pack注册的一些func。很明显可以看到af_packet.c中的tpacket_rcv和packet_rcv就是我们找的目标。
tpacket_rcv是PACKET_MMAP的实现,packet_rcv是普通AF_PACKET的实现。
tpacket_rcv:
- 进行些必要的检查
- 运行run_filter,通过BPF过滤中我们设定条件的报文,得到需要捕捉的长度snaplen
- 在ring buffer中查找TP_STATUS_KERNEL的frame
- 计算macoff、iphone wallpapersmurf village for pc netoff等信息
- 假如snaplen+macoff>frame_size,并且skb为共享的,那么就拷贝skb<一般不会拷贝>
if(skb_shared(skb))
skb_clone()
- 将数据从skb拷贝到kernel Buffer中<拷贝>
skb_copy_bits(skb, 0, h.raw+macoff, snaplen);
- 设置拷贝到frame中报文的头部信息,包括时间戳、长度、状态等信息
- flush_dcache_page()把某页在data cache中的内容同步回内存。
x86应该不用这个,这个多为RISC架构用的
- 调用sk_data_ready,通知睡眠进程,调用poll
- 应用层在调用poll返回后,就会调用pcap_get_ring_frame获得一个frame进行处理。这里面没有拷贝也没有系统调用。
packet_rcv:
- 进行些必要的检查
- 运行run_filter,通过BPF过滤中我们设定条件的报文,得到需要捕捉的长度snaplen
- 假如skb为共享的,那么就拷贝skb<一般都会拷贝>
if(skb_shared(skb))
skb_clone()
- 设置拷贝到frame中报文的头部信息,包括时间戳、长度、状态等信息
- 将skb追加到socket的sk_receive_queue中
- 调用sk_data_ready,通知睡眠进程有数据到达
7.应用层睡眠在recvfrom上,当数据到达,socket可读的时候,调用packet_recvmsg,其中将数据拷贝到用户空间。<拷贝> skb_recv_datagram()从sk_receive_queue中获得skb skb_copy_datagram_iovec()将数据拷贝到用户空间
注:实在在packet处理之前还有一次拷贝过程,在NIC Driver中,创建一个skb,然后NIC把数据DMA到skb的data中。
在另外一些ZeroCopy实现中(例如ntz),假如不希看NIC数据进进协议栈的话,就可以不用考虑skb_shared的题目了,直接将数据从NIC Driver中DMA到制定的一块内存,然后使用mmap到用户空间。这样就只有一次DMA过程,当然DMA也是一种拷贝;-)
关于数据包如何从NIC Driver到packet_rcv/tpacket_rcv,数据包经过中断、软中断等处理,进进netif_receive_skb中对skb进行分发,就会调用dev_add_pack注册的packet_type->func。
关于数据包接受的流程可以阅读一些关于NAPI等相关的资料:
上述内容摘抄自 https://www.xuebuyuan.com/2179173.html 学步园
-----------
内核TPACKET_V3 / V2 / V1三个版本的AF_PACKET
-
struct packet_sock *po
https://elixir.bootlin.com/linux/v5.4-rc2/source/net/packet/af_packet.c#L4266
static int packet_set_ring(struct sock *sk, union tpacket_req_u *req_u,
int closing, int tx_ring)
{
struct pgv *pg_vec = NULL;
struct packet_sock *po = pkt_sk(sk);
int was_running, order = 0;
struct packet_ring_buffer *rb;
struct sk_buff_head *rb_queue;
__be16 num;
int err = -EINVAL;
/* Added to avoid minimal code churn */
struct tpacket_req *req = &req_u->req;
rb = tx_ring ? &po->tx_ring : &po->rx_ring;
rb_queue = tx_ring ? &sk->sk_write_queue : &sk->sk_receive_queue;
err = -EBUSY;
if (!closing) {
if (atomic_read(&po->mapped))
goto out;
if (packet_read_pending(rb))
goto out;
}
if (req->tp_block_nr) {
unsigned int min_frame_size;
/* Sanity tests and some calculations */
err = -EBUSY;
if (unlikely(rb->pg_vec))
goto out;
switch (po->tp_version) {
case TPACKET_V1:
po->tp_hdrlen = TPACKET_HDRLEN;
break;
case TPACKET_V2:
po->tp_hdrlen = TPACKET2_HDRLEN;
break;
case TPACKET_V3:
po->tp_hdrlen = TPACKET3_HDRLEN;
break;
}
err = -EINVAL;
if (unlikely((int)req->tp_block_size <= 0))
goto out;
if (unlikely(!PAGE_ALIGNED(req->tp_block_size)))
goto out;
min_frame_size = po->tp_hdrlen + po->tp_reserve;
if (po->tp_version >= TPACKET_V3 &&
req->tp_block_size <
BLK_PLUS_PRIV((u64)req_u->req3.tp_sizeof_priv) + min_frame_size)
goto out;
if (unlikely(req->tp_frame_size < min_frame_size))
goto out;
if (unlikely(req->tp_frame_size & (TPACKET_ALIGNMENT - 1)))
goto out;
rb->frames_per_block = req->tp_block_size / req->tp_frame_size;
if (unlikely(rb->frames_per_block == 0))
goto out;
if (unlikely(rb->frames_per_block > UINT_MAX / req->tp_block_nr))
goto out;
if (unlikely((rb->frames_per_block * req->tp_block_nr) !=
req->tp_frame_nr))
goto out;
err = -ENOMEM;
order = get_order(req->tp_block_size);
pg_vec = alloc_pg_vec(req, order);
if (unlikely(!pg_vec))
goto out;
switch (po->tp_version) {
case TPACKET_V3:
/* Block transmit is not supported yet */
if (!tx_ring) {
init_prb_bdqc(po, rb, pg_vec, req_u);
} else {
struct tpacket_req3 *req3 = &req_u->req3;
if (req3->tp_retire_blk_tov ||
req3->tp_sizeof_priv ||
req3->tp_feature_req_word) {
err = -EINVAL;
goto out_free_pg_vec;
}
}
break;
default:
break;
}
}
/* Done */
else {
err = -EINVAL;
if (unlikely(req->tp_frame_nr))
goto out;
}
/* Detach socket from network */
spin_lock(&po->bind_lock);
was_running = po->running;
num = po->num;
if (was_running) {
po->num = 0;
__unregister_prot_hook(sk, false);
}
spin_unlock(&po->bind_lock);
synchronize_net();
err = -EBUSY;
mutex_lock(&po->pg_vec_lock);
if (closing || atomic_read(&po->mapped) == 0) {
err = 0;
spin_lock_bh(&rb_queue->lock);
swap(rb->pg_vec, pg_vec);
rb->frame_max = (req->tp_frame_nr - 1);
rb->head = 0;
rb->frame_size = req->tp_frame_size;
spin_unlock_bh(&rb_queue->lock);
swap(rb->pg_vec_order, order);
swap(rb->pg_vec_len, req->tp_block_nr);
rb->pg_vec_pages = req->tp_block_size/PAGE_SIZE;
po->prot_hook.func = (po->rx_ring.pg_vec) ?
tpacket_rcv : packet_rcv;
skb_queue_purge(rb_queue);
if (atomic_read(&po->mapped))
pr_err("packet_mmap: vma is busy: %d\n",
atomic_read(&po->mapped));
}
mutex_unlock(&po->pg_vec_lock);
spin_lock(&po->bind_lock);
if (was_running) {
po->num = num;
register_prot_hook(sk);
}
spin_unlock(&po->bind_lock);
if (pg_vec && (po->tp_version > TPACKET_V2)) {
/* Because we don't support block-based V3 on tx-ring */
if (!tx_ring)
prb_shutdown_retire_blk_timer(po, rb_queue);
}
out_free_pg_vec:
if (pg_vec)
free_pg_vec(pg_vec, order, req->tp_block_nr);
out:
return err;
}
内核AF_PACKET数据结构体定义
struct packet_sock {
/* struct sock has to be the first member of packet_sock */
struct sock sk;
struct packet_fanout *fanout;
union tpacket_stats_u stats;
struct packet_ring_buffer rx_ring;
struct packet_ring_buffer tx_ring;
int copy_thresh;
spinlock_t bind_lock;
struct mutex pg_vec_lock;
unsigned int running; /* bind_lock must be held */
unsigned int auxdata:1, /* writer must hold sock lock */
origdev:1,
has_vnet_hdr:1,
tp_loss:1,
tp_tx_has_off:1;
int pressure;
int ifindex; /* bound device */
__be16 num;
struct packet_rollover *rollover;
struct packet_mclist *mclist;
atomic_t mapped;
enum tpacket_versions tp_version;
unsigned int tp_hdrlen;
unsigned int tp_reserve;
unsigned int tp_tstamp;
struct completion skb_completion;
struct net_device __rcu *cached_dev;
int (*xmit)(struct sk_buff *skb);
struct packet_type prot_hook ____cacheline_aligned_in_smp;
atomic_t tp_drops ____cacheline_aligned_in_smp;
};
static struct packet_sock *pkt_sk(struct sock *sk)
{
return (struct packet_sock *)sk;
}
setsockopt()函数相关内部实现
- https://elixir.bootlin.com/linux/v5.4-rc2/source/net/packet/af_packet.c#L3635
- https://github.com/torvalds/linux/blob/5bc52f64e8841c4526d74f1073bfa95d4f6224d4/net/packet/af_packet.c#L3635-L3694
int
packet_setsockopt(struct socket *sock, int level, int optname, char __user *optval, unsigned int optlen)
{
struct sock *sk = sock->sk;
struct packet_sock *po = pkt_sk(sk);
int ret;
if (level != SOL_PACKET)
return -ENOPROTOOPT;
switch (optname) {
case PACKET_ADD_MEMBERSHIP:
case PACKET_DROP_MEMBERSHIP:
{
struct packet_mreq_max mreq;
int len = optlen;
memset(&mreq, 0, sizeof(mreq));
if (len < sizeof(struct packet_mreq))
return -EINVAL;
if (len > sizeof(mreq))
len = sizeof(mreq);
if (copy_from_user(&mreq, optval, len))
return -EFAULT;
if (len < (mreq.mr_alen + offsetof(struct packet_mreq, mr_address)))
return -EINVAL;
if (optname == PACKET_ADD_MEMBERSHIP)
ret = packet_mc_add(sk, &mreq);
else
ret = packet_mc_drop(sk, &mreq);
return ret;
}
case PACKET_RX_RING:
case PACKET_TX_RING:
{
union tpacket_req_u req_u;
int len;
lock_sock(sk);
switch (po->tp_version) {
case TPACKET_V1:
case TPACKET_V2:
len = sizeof(req_u.req);
break;
case TPACKET_V3:
default:
len = sizeof(req_u.req3);
break;
}
if (optlen < len) {
ret = -EINVAL;
} else {
if (copy_from_user(&req_u.req, optval, len))
ret = -EFAULT;
else
ret = packet_set_ring(sk, &req_u, 0,
optname == PACKET_TX_RING);
}
release_sock(sk);
return ret;
}
case PACKET_COPY_THRESH:
{
int val;
if (optlen != sizeof(val))
return -EINVAL;
if (copy_from_user(&val, optval, sizeof(val)))
return -EFAULT;
pkt_sk(sk)->copy_thresh = val;
return 0;
}
case PACKET_VERSION:
{
int val;
if (optlen != sizeof(val))
return -EINVAL;
if (copy_from_user(&val, optval, sizeof(val)))
return -EFAULT;
switch (val) {
case TPACKET_V1:
case TPACKET_V2:
case TPACKET_V3:
break;
default:
return -EINVAL;
}
lock_sock(sk);
if (po->rx_ring.pg_vec || po->tx_ring.pg_vec) {
ret = -EBUSY;
} else {
po->tp_version = val;
ret = 0;
}
release_sock(sk);
return ret;
}
case PACKET_RESERVE:
{
unsigned int val;
if (optlen != sizeof(val))
return -EINVAL;
if (copy_from_user(&val, optval, sizeof(val)))
return -EFAULT;
if (val > INT_MAX)
return -EINVAL;
lock_sock(sk);
if (po->rx_ring.pg_vec || po->tx_ring.pg_vec) {
ret = -EBUSY;
} else {
po->tp_reserve = val;
ret = 0;
}
release_sock(sk);
return ret;
}
case PACKET_LOSS:
{
unsigned int val;
if (optlen != sizeof(val))
return -EINVAL;
if (copy_from_user(&val, optval, sizeof(val)))
return -EFAULT;
lock_sock(sk);
if (po->rx_ring.pg_vec || po->tx_ring.pg_vec) {
ret = -EBUSY;
} else {
po->tp_loss = !!val;
ret = 0;
}
release_sock(sk);
return ret;
}
case PACKET_AUXDATA:
{
int val;
if (optlen < sizeof(val))
return -EINVAL;
if (copy_from_user(&val, optval, sizeof(val)))
return -EFAULT;
lock_sock(sk);
po->auxdata = !!val;
release_sock(sk);
return 0;
}
off = ((uint8_t *) header) + (TPACKET_HDRLEN - sizeof(struct sockaddr_ll));
memcpy(off, pkt, pktlen);
以下为正文
AF_PACKET原始套接字的创建方法整理
用到的API函数:
- socket()
- bind()
- setsockopt()
- close()
宏定义常量包括:
PACKET_ADD_MEMBERSHIP
PACKET_AUXDATA
PACKET_FANOUT
SO_RCVBUF
数据结构包括:
struct sockaddr_ll bind_address;
struct packet_mreq sock_params;
流程
- 创建/关闭套接字
int fd = socket(AF_PACKET, SOCK_RAW, htons(ETH_P_ALL));
// ...
close(fd);
- 绑定网口
struct sockaddr_ll bind_address;
memset(&bind_address, 0, sizeof(bind_address));
bind_address.sll_family = AF_PACKET;
bind_address.sll_protocol = htons(ETH_P_ALL);
bind_address.sll_ifindex = AFPGetIfnumByDev(fd, devname, verbose);
bind(fd, (struct sockaddr *)&bind_address, sizeof(bind_address));
- PACKET_ADD_MEMBERSHIP 选项
struct packet_mreq sock_params;
setsockopt(fd, SOL_PACKET, PACKET_ADD_MEMBERSHIP,(void *)&sock_params, sizeof(sock_params));
- PACKET_AUXDATA 选项
int val = 1;
setsockopt(fd, SOL_PACKET, PACKET_AUXDATA, &val, sizeof(val));
- SO_RCVBUF 选项
setsockopt(fd, SOL_SOCKET, SO_RCVBUF, &ptv->buffer_size, sizeof(ptv->buffer_size));
- PACKET_FANOUT 选项
uint32_t option=(mode << 16) | (id & 0xffff);
setsockopt(fd, SOL_PACKET, PACKET_FANOUT,(void *)&option, sizeof(option));
网友评论