逅弈 转载请注明原创出处,谢谢!
上篇文章,我描述了如何“设计一个大文件存储系统”,这篇文章,我将对核心的架构和实现做下详细的描述。
系统的架构如下图所示:
antfs.png整个系统主要有三个部分组成:
- QueenAnt:蚁后。负责管理WorkerAnt和对外接收文件存储、恢复的命令,并向WorkerAnt下发相关指令
- WorkerAnt:工蚁。负责具体的文件块的存储和恢复,以及和其他Worker之间的数据同步
- ZooKeeper:注册中心。Queen和Worker注册到ZK,Worker自动发现Queen并进行heartbeat,Worker自动发现其他Worker并进行数据同步
系统的执行流程如下:
1.启动一个Zk集群,或使用已存在的Zk
2.启动Queen,将自己注册到zk上去,路径为/queen,节点类型为临时节点
3.启动Worker,将字节注册到zk上去,路径为/root/worker,节点类型为临时顺序节点
4.Worker到/queen下获取Queen节点,通过netty与Queen进行heartbeat
5.Client调用Queen,进行文件的store或者restore操作
6.Queen将指令转发到具体的Worker
7.Worker进行相关的任务执行,并按条件进行数据的同步
其中,各组件之间通过自定义协议进行通讯,定义一个数据包Packet,如下:
public class Packet {
/** the default magic number */
public static final byte MAGIC = 0x13;
/** the heart beat header */
public static final Header HEART_BEAT_HEADER = new Header(MAGIC,MsgType.HEARTBEAT.getVal(),0);
/** the heart beat packet */
public static final Packet HEART_BEAT_PACKET = new Packet(HEART_BEAT_HEADER,null);
/** the header size */
public static final int HEADER_SIZE = 6;
private Header header;
private String body;
// 省略get、set
public static class Header{
/** magic number */
private byte magic;
/** the message type */
private byte msgType;
/** packet body length */
private int len;
// 省略get、set
}
}
MsgType定义如下:
public enum MsgType {
/**
* heart beat
* from worker to queen
*/
HEARTBEAT((byte)0x00),
/**
* file store
* from client to queen
* from queen to worker
*/
STORE((byte)0x03),
/**
* file restore
* from client to queen
* from queen to worker
*/
RESTORE((byte)0x04),
/**
* file chunk replication
* from worker to worker
*/
REPLICA((byte)0x10),
/**
* file meta sync
* from queen to worker
*/
SYNC((byte)0x11);
}
Worker与Queen进行heartbeat是通过Worker启动时,在EventLoop中调度一个定时任务实现,具体代码如下:
ChannelFuture future = bootstrap.bind(node.getPort()).sync();
LOGGER.info("WorkerServer Startup at port:{}", node.getPort());
Channel channel = future.channel();
// schedule a heartbeat runnable
channel.eventLoop().scheduleAtFixedRate(new HeartbeatClient(),0, Constants.HEART_BEAT_PERIOD,TimeUnit.SECONDS);
LOGGER.info("HeartbeatClient has scheduled");
HeartbeatClient实际是一个Runnable,具体的工作就是连接上Queen之后,向Queen发送HeartBeat的Packet,具体的核心代码如下:
public HeartbeatClient(){
if(discovery==null){
LOGGER.warn("discovery is null,can't get queenNode");
return;
}
// connect to Queen
connect();
}
@Override
public void run() {
heartBeat();
}
private void heartBeat() {
if(channel==null){
LOGGER.warn("channel is null,can't send a heart beat packet to queen");
return;
}
Packet packet = Packet.HEART_BEAT_PACKET;
LOGGER.debug("worker send heart beat packet to queen with packet={},remoteAddress={}",packet,channel.remoteAddress());
channel.writeAndFlush(packet);
}
整个系统的代码在github上:antfs,欢迎有兴趣的一起贡献。
更多原创好文,请关注「逅弈逐码」
网友评论