![](https://img.haomeiwen.com/i7006392/7928c1d71dfcabb1.png)
1. 数据获取
1.1 控制节点
- 控制节点定时获取区块高度和区块时间
- 如果区块时间和当前时间超过一定阈值则发送告警
- 根据区块高度生成解析任务,并将解析任务的状态存到数据库(mysql)
- 比如本次区块高度是108,上次的区块高度是100,那么生成8个解析任务(101-108)
- 每个区块生成一个解析任务
- 根据拆分后的解析任务,发送请求给解析服务 请求参数大致为:解析id,blocknumber , datetime
- 解析服务收到请求后,直接返回success。
- 在解析完成后,则需要回调控制节点,将任务的解析状态同步给控制节点,控制节点则根据实际返回调整数据库中的解析任务状态
- 如果解析任务失败,则需要失败重试
1.2 解析任务状态表
字段 | 类型 | 解释 |
---|---|---|
id | int | 自增id |
block_nunber | bigint | 区块高度 |
datetime | timestamp | 区块时间 |
parse_node | timestamp | 解析节点 |
status | tinyint | 解析状态 |
1.3. 任务状态监控
- 任务失败告警
- 任务阻塞告警
2. kafka 数据
- 存储解析后的DEFI DEX、DEFI LENDING数据,avro/json/.. 格式
- 数据结构
大致类似于:
DEFI DEX 数据
blockchain,block_time,block_number,token_bought_amt,token_sold_amt,usd_amt,token_bought_address,token_sold_address,tx_from,tx_to,tx_hash
DEFI LENDING 数据
borrow_data / deposits_data / repayment_data
3. 数据流
3.1 ods 数据层
原样存储 kafka 里面的数据 并落到hive库
3.2 dw 数据层
- dwd 数据层
对ods 数据进行清洗和拆分后的数据 - dws 数据层
对dwd数据层的数据进行聚合之后的数据层(余额表、聪明钱地址表)。- 余额表
实际是对所有转账数据加总后的最后结果 - 聪明钱
计算trade表中每个钱包地址对应的win_rate和roi
下面是我再dune平台通过trade表计算sol链聪明钱的逻辑
with tx_detail as ( select tx_id, trader_id, case when token_bought_mint_address = 'So11111111111111111111111111111111111111112' then 'SELL' else 'BUY' end as tx_type, case when token_bought_mint_address = 'So11111111111111111111111111111111111111112' then token_sold_symbol else token_bought_symbol end as token_symbol, case when token_bought_mint_address = 'So11111111111111111111111111111111111111112' then token_bought_amount else token_sold_amount end as sol_token_amount, case when token_bought_mint_address = 'So11111111111111111111111111111111111111112' then token_sold_amount else token_bought_amount end as spl_token_amount, case when token_bought_mint_address = 'So11111111111111111111111111111111111111112' then token_sold_mint_address else token_bought_mint_address end as token_address, block_time from dex_solana.trades where block_time >= date_trunc('day', current_timestamp) - interval '30' day and ( token_bought_mint_address = 'So11111111111111111111111111111111111111112' or token_sold_mint_address = 'So11111111111111111111111111111111111111112' ) AND project = 'raydium' ), -- 去除首个交易是sell的 tx_detail_v2 as ( select v2.*, min(case when v2.tx_type='BUY' then v2.block_time else current_timestamp end) over (partition by v2.trader_id,v2.token_address) as min_buy_time, max(case when v2.tx_type='SELL' then v2.block_time else current_timestamp - interval '3' month end) over (partition by v2.trader_id,v2.token_address) as max_sell_time from tx_detail as v2 ), tx_detail_v3 as ( select * from tx_detail_v2 where block_time>=min_buy_time ), user_agg as ( select td.trader_id, td.token_address, count(1) as token_tx_cnt, sum(case when tx_type='BUY' then 1 else 0 end) as buy_cnt, sum(case when tx_type='BUY' then sol_token_amount else 0 end) as sol_buy_amt, sum(case when tx_type='BUY' then spl_token_amount else 0 end) as spl_buy_amt, sum(case when tx_type='SELL' then 1 else 0 end) as sell_cnt, sum(case when tx_type='SELL' then sol_token_amount else 0 end) as sol_sell_amt, sum(case when tx_type='SELL' then spl_token_amount else 0 end) as spl_sell_amt, date_diff('second',min(case when tx_type='BUY' then block_time else current_timestamp end),min(case when tx_type='SELL' then block_time else current_timestamp end)) as holding_time, max(block_time) as last_tx_time, min(block_time) as first_tx_time from tx_detail_v3 as td group by td.trader_id,td.token_address ), user_agg_v2 as ( select trader_id, sum(case when sol_sell_amt-sol_buy_amt > 0.7 * sol_buy_amt then 1 else 0 end ) as win_cnt, sum(case when sol_sell_amt-sol_buy_amt <= 0.7 * sol_buy_amt then 1 else 0 end ) as failed_cnt, sum(case when sol_sell_amt-sol_buy_amt > 0.7 * sol_buy_amt then 1 else 0 end ) * 100 /count(token_address) as win_rate, count(token_address) as tx_token_cnt, sum(sol_sell_amt-sol_buy_amt) as earn_amt, sum(sol_sell_amt-sol_buy_amt)* 100 / sum(sol_buy_amt) as roi, approx_percentile((sol_sell_amt-sol_buy_amt)*100/sol_buy_amt,0.5) as median_roi, min((sol_sell_amt-sol_buy_amt)*100/sol_buy_amt) as min_roi, max((sol_sell_amt-sol_buy_amt)*100/sol_buy_amt) as max_roi, sum(sol_sell_amt) as sol_sell_amt, sum(sol_buy_amt) as sol_buy_amt, approx_percentile(sol_buy_amt,0.5) as median_sol_buy_amt, approx_percentile(holding_time,0.5) as median_holding_time, approx_percentile(token_tx_cnt,0.5) as median_token_tx_cnt, approx_percentile(buy_cnt,0.5) as median_token_buy_cnt, min(buy_cnt) as min_token_buy_cnt, max(buy_cnt) as max_token_buy_cnt, approx_percentile(sell_cnt,0.5) as median_token_sell_cnt, min(sell_cnt) as min_token_sell_cnt, max(sell_cnt) as max_token_sell_cnt, min(holding_time) as min_holding_time, max(holding_time) as max_holding_time, max(last_tx_time) as last_tx_time, min(first_tx_time) as first_tx_time from user_agg as ua where spl_buy_amt >= spl_sell_amt group by trader_id ) select t.*,uh.sol_balance from user_agg_v2 as t left join solana_utils.latest_balances as uh on t.trader_id= uh.address where 1=1 -- and trader_id ='ATHyhTYArq8ixrZHk46eC6JUkivCGiKkXZfKMQGA1L74' and first_tx_time > current_timestamp - interval '3' day and ((win_rate > 65 and tx_token_cnt >= 12 and earn_amt > 10) or (win_rate > 70 and tx_token_cnt >= 9 and earn_amt > 10) or (win_rate > 80 and tx_token_cnt >= 7 and earn_amt > 10) or (win_rate > 90 and tx_token_cnt >= 5 and earn_amt > 5) or (win_rate = 100 and tx_token_cnt >= 3 and earn_amt > 5) or (win_rate = 100 and tx_token_cnt >=1 and min_roi > 300) ) and median_holding_time >= 30 and median_holding_time < 60 * 60 * 2 and roi > 100 and last_tx_time > current_timestamp - interval '1' day and median_token_buy_cnt = 1 and median_token_sell_cnt > 2 and median_sol_buy_amt >= 0.1 and tx_token_cnt < 60 and median_roi > 70 and (uh.sol_balance > 1 or uh.sol_balance is null) order by roi desc;
- 稳定币的流入流出
这个只是需要计算固定的某些币('USDT','USDC','DAI','USDe','sUSDe','sUSD','FRAX','sDAI','GHO','USDM' 等)的 流入流出就好了。 - 远古地址、鲸户地址
这些主要是从余额表里面获取的 - 交易所的充提币
应该transaction里面有带一些固定的交易地址是某些交易所的(不确定)
- 余额表
3.3 ads 层
主要是一些展示层的数据,这个要根据具体的dashboard需求来了
4. OLAP库
- 建议选择dorisdb,相对于clickhouse,join的效果 会更好,并且支持大批量的update,在流数据中实时计算余额表等都有更好的效果。
- 如果有时序性数据库的需求 也可以使用influxdb
- 如果展示层的数据量小,也可以使用mysql等oltp库
5. dashboard
- 可以使用一些开源的比如superset 或者 metabase 等
- 也可以自研展示平台
- 建议自研一个数据管理平台,方便元数据管理、数据质量、数据安全等
6. data governance
元数据管理
- 将数据资产的CRUD放到元数据管理平台
- 自动根据sql获取数据血缘
- 可以选择 apache atlas 等图数据库
数据质量
- 针对所有的数据资产创建对应的数据质量校验规则
- 校验失败告警
- 数据质量分上传data governance platform,并且在data map展示
数据安全
- 建立统一的网关,对数据资产的所有CRUD鉴权和管控
- 所有针对数据资产的操作全部收拢到平台侧
数据地图
- 数据入口
- 搜索数据资产
- 展示数据资产的元数据信息和数据样例
7. Dashboard
- 前期建议开源: superset 、metabase 等
- metabase 更适合产品经理操作
- superset 复杂性更高一点,但是也更专业一点,图更多一点
多空信号
- 隐形钱地址/聪明钱地址的 BTC / ETH 大的币种的突然大幅流入 产生 做多信号
- 稳定币的突然大幅流出产生 做多信号
- 在1和2的基础上,针对小的币种也做同样的 隐形钱和聪明钱的提示,去选择币种操作
网友评论