4. 关键算法
生成行车轨迹的算法描述:对于每个过车agent,会随机选择一个车牌,随机选择一个卡口作为起始卡口,生成一个行车轨迹。在所行驶的坐标范围内,在某个卡口点位可选择的行驶方向如下图所示:
图4
由图4可知,在不同的坐标可选择的行驶方向也有不同,可选择4个方向,也可能选择3个方向,也可能选择2个方向。行车轨迹生成算法需要根据当前卡口坐标,在候选方向集合中随机选择一个方向来模拟行车。
对于每个行车轨迹的长度,即经过的卡口点的数量,可随机生成,建议最大距离为行驶范围矩形对角线的长度。
行车轨迹模拟算法如下所示:
名称:模拟行车轨迹
输入参数:无
输出参数:无
过程:
//随机选择一个卡口
startTollgate = getRandomTollgate();
//随机选择一个车牌并锁定该车牌
plate = getRandomPlate();
//将起始过车写入过车数据流
motorvehicle = createMotorvehicle(startTollgate, plate);
writeMotorvehicleDatastream(motorvehicle);
//随机生成行驶长度
distance = getRandomDistance();
//随机生成行驶速度,在20~80中选个随机数
speed = getRandomSpeed();
//计算模拟的每条过车记录,并写入过车数据流中
currentTollgate = startTollgate;
For(int i=0; i< distance; i++){
//根据当前卡口能走的方向计算出随机行驶方向
direction = simuRandomDirection(currentTollgate);
//根据速度计算出经过下一卡口的过车时间
plate.lasttime = calcPassTime(speed);
//根据当前卡口和行驶方向计算出下一卡口
nextTollgate = calcNextTollgate(currentTollgate, direction);
//将模拟的过车写入过车数据流
motorvehicle = createMotorvehicle(nextTollgate, plate);
writeMotorvehicleDatastream(motorvehicle);
//进入下一次循环
currentTollgate = nextTollgate
}
5. 运行方式
目前我们的视图数据存储使用hdfs和hive,建表工具使用iceberg。因此我们写了一个flink job来写入到iceberg的过车表中。flink job中自定义生成过车数据的source,过车写入datastream中,最后写入为iceberg表的sink中。过车agent的source代码如下:
/**
* Title: CommonMTAgent.java
* Description: 通用过车agent
* author zhang.kai
* Copyright: Copyright (c) 2025
* Company: www.kedacom.com
* date: 2021年7月21日-下午2:37:32
* version: V1.0
*/
package com.kedacom.simumotor.agent;
import java.io.Serializable;
import java.util.Date;
import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import com.kedacom.simumotor.config.SimuConfig;
import com.kedacom.simumotor.pojo.Motorvehicle;
import com.kedacom.simumotor.pojo.Plate;
import com.kedacom.simumotor.pojo.SimuTollgate;
import com.kedacom.simumotor.pool.PoolManager;
import com.kedacom.simumotor.util.MotorVehicleUtils;
import lombok.extern.slf4j.Slf4j;
@Slf4j
public class CommonMTAgent implements Runnable, Serializable {
/**
*
*/
private static final long serialVersionUID = -183630777130253681L;
private SourceContext<RowData> sc;
public String name;
public CommonMTAgent() {
}
public CommonMTAgent(SourceContext<RowData> ctx) {
sc = ctx;
}
public void start() {
startSimuData();
}
@Override
public void run() {
log.info("************** plateLastAppearTime:{}",new Date(SimuConfig.plateLastAppearTime).toString());
long count = 0;
while (!SimuConfig.ifstop) {
try {
// 获取随机卡口和车牌
SimuTollgate tollgate = PoolManager.getRandomTollgate();
Plate plate = PoolManager.getRandomPlate();
// 生成行车轨迹
synchronized (plate) {
if (plate.getPlateLastTime() < System.currentTimeMillis()) {
plate.setPlateLastTime(System.currentTimeMillis());
}
// 在上次出现的时间上加上间隔
plate.setPlateLastTime(plate.getPlateLastTime() + SimuConfig.APPEAR_DELAY * 1000);
writeMotorvehicle2Stream(tollgate, plate);
count++;
// 生成随机轨迹
int pathLength = MotorVehicleUtils.getRandomPathLength();
for (int i = 0; i < pathLength; i++) {
tollgate = MotorVehicleUtils.getNextTollgate(tollgate);
if (null == tollgate) {
break;
}
MotorVehicleUtils.setNextAppearTime(plate);
writeMotorvehicle2Stream(tollgate, plate);
count++;
}
}
// 到达批量写入,进行计数
if (count >= SimuConfig.BATCH_WRITE_SIZE) {
log.info("batchWriteMv:{}", count);
// Thread.sleep(10000);
count = 0;
}
} catch (Exception e) {
log.error("XXXXXX CommonMTAgent run error", e);
}
}
}
/**
* 启动模拟数据线程
*/
private void startSimuData() {
Thread agentThread = new Thread(this);
agentThread.setName("SimuThread-" + name);
agentThread.start();
}
/**
* 将生成的motorvehicle写入数据流中
*
* @param tollgate
* @param plate
* @param metricqueue
*/
private void writeMotorvehicle2Stream(SimuTollgate tollgate, Plate plate) {
Motorvehicle mv = MotorVehicleUtils.getMotorvehicle(tollgate, plate);
RowData row = transformMv(mv);
sc.collect(row);
}
/**
* 将motorvehicle对象转换为
*
* @param mv
* @return
*/
private RowData transformMv(Motorvehicle mv) {
GenericRowData rowData = new GenericRowData(16);
long passtime = mv.getPassTime() / 1000;
int days = (int) (passtime / (60 * 60 * 24));
rowData.setField(0, days);
rowData.setField(1, MotorVehicleUtils.wrap(mv.getMotorVehicleID()));
rowData.setField(2, mv.getInfoKind());
rowData.setField(3, MotorVehicleUtils.wrap(mv.getTollgateId()));
rowData.setField(4, passtime);
rowData.setField(5, MotorVehicleUtils.wrap(mv.getDeviceId()));
rowData.setField(6, MotorVehicleUtils.wrap(mv.getStorageUrl1()));
rowData.setField(7, mv.getLeftTopX());
rowData.setField(8, mv.getLeftTopY());
rowData.setField(9, mv.getRightBtmX());
rowData.setField(10, mv.getRightBtmY());
rowData.setField(11, MotorVehicleUtils.wrap(mv.getHasPlate()));
rowData.setField(12, MotorVehicleUtils.wrap(mv.getPlateClass()));
rowData.setField(13, MotorVehicleUtils.wrap(mv.getPlateColor()));
rowData.setField(14, MotorVehicleUtils.wrap(mv.getPlateNo()));
rowData.setField(15, MotorVehicleUtils.wrap(mv.getVehicleColor()));
return rowData;
}
}
经过测试,在我们自己的flink集群上,每小时能够写入约2亿条过车数据,一天大约能够写入近50亿过车数据。
网友评论