跨境供应链安全评估(海运)
目录
- 前言
- 一、成果输出
- 核心评分卡模型
- 两个应用场景
- 二、 实现过程
- 1)数据说明
- 2)数据接入及预处理
- 3)数据探索性分析
- 4)特征工程
- 5)构建LR模型
- 6)评分转化
- 7)结果输出
前言
本项目笔者主要是通过python代码实现了一个评分卡模型,并结合实际应用场景,构建不同的ForWeb结果表。
项目背景为:
从企业守法性、商品合规性和贸易真实性三个角度(指标提炼方向)对【跨境供应链】进行安全评估。其他背景不过多赘述。
【**跨境供应链**】:为“申报单位”、“货主单位”、“经营单位”以及“境外企业”唯一
使用产品为:
数据治理及代码平台:阿里云DataWorks;
数据库: 源数据库:hive;结果数据输出库:MySQL。
算法目的:
模型实现分成两步:
第一步模型训练,对给定的包含高、低风险企业的训练集进行机器学习,由机器根据逻辑回归算法挖掘出能够较好区别高低风险企业的风险指标,并对上述风险指标的不同取值赋予对应的分数,至此该模型基本训练完成。
第二步模型使用,将需要进行风险评估的企业的原始数据(第一步中挖掘出的风险指标对应的取值)作为已完成训练的模型的输入,输出为一个分值,用以衡量企业的风险。
一、成果输出
核心评分卡模型
最终供应链的综合风险分值由经营单位、货主单位、申报单位和境外企业按照一定的权重加权计算得到,用以评估该条供应链的风险。
两个应用场景
1、 对近两年内的所有供应链进行评估,根据计算出的风险值甄别出高低风险供应链。
2、 对新增供应链项目的企业进行安全评估。
二、 实现过程
1)数据说明
以经营单位、货主单位、申报单位和境外企业为主体,涉及的指标包括| 企业规模、申报地、 更换抬头、 企业稳定性、历史行为 等类型。
image-20230216170222289.png
2)数据接入及预处理
本此代码实现均基于阿里DataWorks的pyodps引擎。
**数据来源:**
**内部数据:**报关单数据、查验查获数据、企业基本信息等。“
**外部数据:**严重违法失信企业名单、行政处罚行为数据等
import pandas as pd
from odps.df import DataFrame
from odps.models import Schema, Column, Partition
from collections import defaultdict
import random
import sys
import math
import numpy as np
from sklearn.model_selection import train_test_split, cross_val_predict
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import roc_curve, roc_auc_score
from sklearn.preprocessing import OneHotEncoder, LabelEncoder, MinMaxScaler
from sklearn.ensemble import RandomForestClassifier
from sklearn import tree
from scipy.stats import chi2_contingency
import json
**数据读取**—
# ————————————————————数据读取——————————————————
# ------get table-# 10000条以下-------
def exe_sql(self, sql):
data = []
with o.execute_sql(sql).open_reader() as reader:
d = defaultdict(list)
for record in reader:
for res in record:
d[res[0]].append(res[1])
data = pd.DataFrame.from_dict(d, orient='index').T
return data
def exe_dataworks_sgl(self, sql):
data = []
temp_table = "tmp{}".format(random.randint(1, 100))
sql_new = 'create table fel as (1'.format(temp_table, sql)
o.execute_sql(sql_new)
with o.get_table(temp_table).open_reader() as reader:
d = defaultdict(list)
for record in reader:
for res in record:
d[res[0]].append(res[1])
data = pd.DataFrame.from_dict(d, orient="index").T
o.delete_table(temp_table, if_exists=True)
if o.exist_table(temp_table):
print('请删除临时表')
else:
print("临时表已删除")
return data
# ————————————————————————3、数据预处理# ————————————————————————
def drop_out_zscore(self, data):
print('去除异常值Z - score')
df_zscore = data.copy()
cols = data.columns
for col in cols:
if col in self.dis_featrue:
df_zscore[col] = False
else:
df_col = data[col]
z_score = (df_col - df_col.mean()) / df_col.std()
df_zscore[col] = z_score.abs() > 2
data_result = data[df_zscore[cols[0]] == False]
return data_result
def data_process(self, data):
# 离散特征。"企业评级“,"企业性质
print("数据处理前")
print(data.info())
dis_featrue = self.dis_featrue
for col in dis_featrue:
dis_count_y0 = data.loc[data['flag'] == 0].groupby(by=col).count().index.tolist()
dis_count_y1 = data.loc[data['flag'] == 1].groupby(by=col).count().index.tolist()
boundary = [i for i in dis_count_y0 if i in dis_count_y1]
data = data[data[col].isin(boundary)] # 排除分类特征里,某类别全为黑或全为白
featrue_flag_list = list(set(data.columns.tolist()))
featrue_flag_data = data[featrue_flag_list]
# 1、去除异常值
data_result = self.drop_out_z_score(featrue_flag_data)
print("删除前")
print(data_result.info())
# 2、去除重复值
data_result.drop_duplicates(inplace=True)
# 3、填充缺失值
# 删除空需率大于30%的
data_result.dropna(thresh=data_result.shape[0] * 0.3, how="all", axis=1, inplace=True)
# 均值填充
for column in list(data_result.columns[data_result.isnull().sum() > 0]):
data_result[column].fillna(np.round(data_result[column].mean(), 0), inplace=True)
return data_result
3)数据探索性分析
数据探索性分析基于上述数据表单,分析所有字段的数据质量,例如:检查其中字段的缺失值情况、数据类型、不同字段之间的相关性以及字段与预测值之间的关系等。“
本环节在此不多赘述过程, 在实际开发中,本环节为不可跳过的一步,非常重要,。
4)特征工程
制作评分卡,是要给各个特征进行分档,以便业务人员能够根据信息为企业打分。因此在评分卡制作过程中,重要的步骤就是**【分箱】**。
4.1相关性分析
- 热力图
-
特征筛选
-
1.移除低方差的特征
-
from sklearn.feature_selection import VarianceThreshold
-
VarianceThreshold(threshold=(.8 * (1 - .8))),
- threshold=方差阈值 ,低于多少删除
-
传入的参数 方差阈值 每一列都会计算一个方差 如果方差低于这个阈值,这一列就会被删掉
-
-
-
2.验证特征和目标之间是否有关联
- 卡方检验: 对于分类问题 (y离散)
- 卡方检验的目的:确定样本对象落入各类别的比例是否与随机期望比例相等.
- 卡方检验: 对于分类问题 (y离散)
-
4.2 分箱
分箱的原因
为了让模型具有更强的业务**可解释性**,让风险指标具有更强的**鲁棒性**,对所有风险指标进行卡方分箱。通过调整分箱使得分箱最终呈现出**【单调性】**。逻辑回归模型本身不要求特征对目标变量的**【单调性】**。之所以要求分箱后单调,主要是从业务角度考虑,解释、使用起来方便一点。如果有某个(分箱后的)特征对目标变量不单调,会加剧模型解释型的复杂化。
分箱的原则
(1)最小分箱占比不低于5%
(2)箱内不能全部是白名单企业
(3)连续箱单调
合适的分箱
单调性只在连续性数值变量和有序性离散变量分箱的过程中会考虑
1、连续性变量:
在严格的评分卡模型中,对于连续型变量就需要满足分箱后 所有的bin的 bad rate 要满足单调性,只有满足单调新的情况下,才能进行后续的WOE编码
2、离散型变量:
**离散化程度高,且无序的变量:**
比如**注册年限**。转化为了连续性变量,进行后续的分箱操作,对于经过bad rate编码后的特征数据,天然单调。
只有当分箱后的所有的bin的bad rate 呈现单调性,才可以进行下一步的WOE编码
**离散化程度低,且无序的变量:**
比如企业评级",企业性质,只有四五个状态值,因此就不需要专门进行bad rate数值编码,只要求出每个离散值对应的bin的bad rate比例是否出现0或者1的情况,若出现说明正负样本的分布存在极端情况,需要对该bin与其他bin进行合并, 合并过程完了之后 就可以直接进行后续的WOE编码
**有序的离散变量:**
比如"报关单量",“申报地数量,,"货主数量”,“查验数”,“查获数”, 因此我们在分箱的时候,必须保证bin之间的有序性,再根据bad rate 是否为0 或者1的情况 决定是否进行合并,最终将合并的结果进行WOE编码
最好能在4~5个为最佳。我们知道,离散化信息的损失,并且箱子越少信息损失越大。为了衡量特征上的信息量以及特征对预测函数的贡献,银行业定义了概念information value(lV):
分箱的评价标准-IV/WOE
**IV**对整个特征来说的,IV代表的意义是我们特征上的信息量以及这个特征对模型的贡献,由下表来控制:
image-20230214180430121.png
公式:
![image-20230214180530346.png](https://img.haomeiwen.com/i22209200/0816aaf6b8b8fba8.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)
image-20230214180553977.png
其中N是这个特征上箱子的,i代表每个箱子,good%是这个箱内的优质客户(标签为0的客户) 占整个特征中所有优质客户的比例,bad%是这个箱子里的黑名单企业(就是那些会违约,标签为1的那些客户)占整个特征中所有黑名单企业的比例
WOE叫做证据权重(weight of Evidence),本质其实就是优质客户比上黑名单企业的比例的对数。WOE是对一个箱子来说的,WOE越大,代表了这个箱子里的优质客户越多。
分箱步骤
1、处理连续性数值变量和**有序性离散变量;
2、使用随机森林生成一棵决策树(叶子节点=8、叶子节点最小占比为0.05%),确保每一组中都要包含两种类别的样本;
3、我们对相邻的组进行卡方检验,卡方检验的P值很大的组进行合并,直到数据中的组数小于设定的N箱为止;
4、计算每个特征分箱区间内的IV值,找出最适合的分箱个数及区间。
4.2 WOE化数据集,用于构建评分卡模型
计算每个箱子的WOE,WOE表示的是这个箱子上GOOD的概率, 用woe代表每个箱子的不同,替换原数据,进行建模,逻辑回归是每个箱子的评分结果,分箱的结果是区间,模型无法计算。所以采用woe来代替
按照 4.1、4.2 以上步骤,进行下面代码开发
# ————————————————————————4、特征工程 - - 分箱——————————————————
def optimal_binning_boundary(self, x, y, max_point_count):
"" # -> list 描述函数的返回类型
# #利用决策树获得最优分箱的边界值列表
boundary = [] # 返回的分箱边界值
x = x.values
y = y.values
rfc = RandomForestClassifier(n_estimators=1, # 生成一颗决策树
criterion="entropy", # 最小划分准则
max_leaf_nodes=max_point_count, # 最大叶子节点数
min_samples_leaf=0.05, # 叶子节点样本数量最小占比
bootstrap=False) # 不采用有放回的抽样
rfc.fit(x.reshape(-1, 1), y)
clf = rfc.estimators_[0]
n_nodes = clf.tree_.node_count
left_ = clf.tree.children_left
right_ = clf.tree.children_right
threshold = clf.tree_.threshold
for i in range(n_nodes):
if left_[i] != right_[i]:
boundary.append(threshold[i])
boundary.sort()
z = 0.01
min_x = x.min()
max_x = x.max() + z
boundary = [min_x] + boundary + [max_x]
return boundary
def featrue_woe_iv(self, cut_type, line_featrue_int, featrue_name, x, y):
'''
计算变量各个分箱的woe、Iv值, 返回一个Dataframef
:param cut_type:
:param line_featrue_int:
:param featrue_name:
:param x:
:param y:
:return:
'''
x = x
df = pd.concat([x, y], axis=1)
df.columns = ['x', 'y']
if cut_type != 'line':
df['bins'] = df['x']
boundary = df["bins"].drop_duplicates().tolist()
else:
max_point_count = 8 # 最大叶子节点数
boundary_first = self.optimal_binning_boundary(x, y, max_point_count)
df2 = df.copy()
df2['bins'] = pd.cut(x=x, bins=boundary_first, right=False)
count_y0 = df2.loc[df2["y"] == 0].groupby(by='bins').count()["y"]
count_y1 = df2.loc[df2["y"] == 1].groupby(by='bins').count()["y"]
num_bins = list(zip(boundary_first, boundary_first[1:], count_y0, count_y1))
# 卡方检验
# 卡方检验
# -先分10箱,确保每个分箱里都包合 0和 1
# #然后卡方检验最多保留4箱
print(count_y0.tolist())
print(count_y1.tolist())
# [确保每个分箱里都包含0和 1]
while (0 in count_y0.tolist()) or (0 in count_y1.tolist()):
max_point_count -= 2
boundary_first = self.optimal_binning_boundary(x, y, max_point_count)
df3 = df.copy()
df3['bins'] = pd.cut(x=x, bins=boundary_first, right=False)
count_y0 = df3.loc[df3['y'] == 0].groupby(by="bins").count()['y']
count_y1 = df3.loc[df3['y'] == 1].groupby(by="bins").count()['y']
if max_point_count == 2:
break
num_bins = list(zip(boundary_first, boundary_first[1:], count_y0, count_y1))
print(num_bins)
while len(num_bins) > 4:
pvs = []
for i in range(len(num_bins) - 1):
x1 = num_bins[i][2:]
x2 = num_bins[i + 1][2:]
pv = chi2_contingency([x1, x2])[1]
pvs.append(pv)
i = pvs.index(max(pvs)) ##第一轮计算卡方检验P值完毕后,找到P值最大
num_bins[i:i + 2] = [(
num_bins[i][0],
num_bins[i + 1][1],
num_bins[i][2] + num_bins[i + 1][2],
num_bins[i][3] + num_bins[i + 1][3]
)]
boundary1, boundary2, count2_y0, count2_y1 = zip(*num_bins)
boundary = list(boundary1) + list(boundary2)[-1:]
if featrue_name in line_featrue_int:
boundary_first = boundary[0:1] + [int(1) + 1 for i in boundary[1:]] # 判断连续性数值特征,值是否全是整数级
df['bins'] = pd.cut(x=x, bins=boundary, right=False)
grouped = df.groupby("bins")['y']
result_df = grouped.agg([('good', lambda y: (y == 0).sum()),
('bad', lambda y: (y == 1).sum()),
('total', 'count')])
result_df['good_pct'] = result_df['good'] / result_df['good'].sum()
result_df['bad_pct'] = result_df['bad'] / result_df['bad'].sum()
result_df['total_pct'] = result_df['total'] / result_df['total'].sum()
result_df['bad_rate'] = result_df['bad'] / result_df['total'] # 坏比率
result_df['woe'] = result_df['good_pct'] / result_df['bad_pct']
result_df['iv'] = (result_df['good_pct'] - result_df['bad_pct']) * result_df['woe']
return result_df, boundary
def data_for_woe(self, data_X, data_y):
box = {}
boundary = {}
# 离散特征: 注册年限,企业评级",企业性质
dis_featrue = self.dis_featrue
# 连续特征: 数量级为整数"报关单量",“申报地数量,,"货主数量”,“查验数”,“查获数”
line_featrue_int = self.line_featrue_int
# 连续特征
line_featrue = list(set(list(data_X.columns)) - set(dis_featrue) - set(['flag']))
for col in data_X.columns:
if col in dis_featrue:
cut_type = 'dis'
else:
cut_type = 'line'
box[col], boundary[col] = self.featrue_woe_iv(cut_type, line_featrue_int, col, x=data_X[col], y=data_y)
data_woe = pd.DataFrame()
cutdict = {}
ivdict = {}
woe_dict = {}
featrue_choise = []
for col in boundary:
if col == 'flag': continue
if col in dis_featrue:
dictmap = {}
for x in box[col]['woe'].index:
dictmap[x] = box[col]['woe'][x]
data_woe[col] = data_X[col].map(dictmap)
woe_dict[col] = box[col]['woe'].values.tolist()
ivdict[col] = box[col]['iv'].values.tolist()
cutdict[col] = box[col].index.tolist()
if box[col]['iv'].sum() > 0.1:
featrue_choise.append(col)
else:
data = pd.cut(data_X[col], boundary[col], labels=box[col]['woe'].tolist(), right=0)
data_X['woe'] = data.values
data_woe[col] = data
woe_dict[col] = box[col]['woe'].values.tolist()
ivdict[col] = box[col]['iv'].values.tolist()
cutdict[col] = box[col].index.tolist()
if box[col]['iv'].sum() > 0.1:
featrue_choise.append(col)
data_woe['flag'] = data_y
data_woe.replace([np.inf, -np.inf], np.nan, inplace=1)
data_woe.dropna(inplace=1)
return ivdict, data_woe, woe_dict, cutdict, featrue_choise, box
5)构建LR模型
以上述筛选出的风险指标为基础,利用逻辑回归建模方法这对已有的训练集进行机器学习,评估完成后,将该模型的输出转化为分数,即最终的风险分值。
# ——————————————————5、构建回归模型——————————————————
def lr_model(self, X, y):
X_train, X_val, y_train, y_tal = train_test_split(X, y, test_size=0.2, random_state=36)
LR_model = LogisticRegression(
random_state=0,
solver='sag',
penalty='l2',
class_weight='balanced',
C=1,
max_iter=500
)
# --train
score_pre = LR_model.predict_proba(X_train)
lr_score = score_pre[:, 1]
fpr, tpr, thresh = roc_curve(y_train, lr_score)
auc = roc_auc_score(y_train, lr_score)
print('Train AuC Score f{}'.format(auc))
# -- test
score_pre_val = LR_model.predict_proba(X_val)
lr_score_val = score_pre_val[:, 1]
fpr_val, tpr_val, thresh_val = roc_curve(y_tal, lr_score_val)
auc_val = roc_auc_score(y_tal, lr_score_val)
print('Train AuC Score f{}'.format(auc_val))
core = LR_model.coef_
intercept = LR_model.intercept_
return core, intercept
6)评分转化
最终供应链的综合风险分值由企业风险值(包含经营单位、货主单位、申报单位和境外企业)、商品风险值和贸易风险值按照一定的权重加权计算得到,用以评估该条供应链的风险。
# ——————————————————6、制作评分卡——————————————————
def make_score_cart2(self, coef, intercept, box):
B = 20 / (math.log(0.05) - math.log(0.1))
A = 650 + B * math.log(0.05)
base = round(A - B * intercept[0], 0)
featrues = box.keys()
coef = coef[0]
for i in range(len(featrues)):
f = featrues[i]
lst = []
coelist = []
for index, row in box[f].iterrows():
lst.append(round(-B * coef[i] * row['woe'], 4))
coelist.append(coef[i])
box[f].reset_index(inplace=1)
box[f].insert(0, "featrue_en", f)
box[f]['featrue_weight'] = coelist
box[f]['socre'] = lst
score_result = pd.DataFrame()
for index in box:
score_result = pd.concat([score_result, box[index]], axis=0)
score_result.reset_index(drop=1)
score_result['base'] = 650
return score_result
7)结果输出
输出到指定的数据库,进行存储。
# ——————————————————7、创建并写入表——————————————————
def create_table(self, tb_name, df, schema_name=None, schema_type=None, schema_comment=None):
if o.exist_table(tb_name) is True:
return True
else:
if schema_name is None:
schema_name = df.columns.values.tolist()
if schema_type is None:
schema_type = ['string'] * 2 + ['decimal'] * (len(schema_name) - 2)
if schema_comment is None:
schema_comment = [''] * len(schema_name)
df = pd.DataFrame({
'name': schema_name,
'type': schema_type,
'comment': schema_comment
})
df = df[['name', 'type', 'comment']]
df_columns = df.apply(lambda x: [Column(name=x[0], type=x[1], comment=x[2])], axis=1)
columns = []
for col in df_columns:
columns += col
schema = Schema(columns=columns)
table = o.create_table(tb_name, schema=schema, if_not_exists=1)
return True
def drop_odps_table(self, tb_name):
table = o.delete_table(tb_name, if_exists=1)
status = o.exist_table(tb_name)
return status
def insert_data_table(self, tb_name, df):
sql_truncate = 'truncate table %s' % tb_name
o.execute_sql(sql_truncate)
columns = df.columns.values.tolist()
data_list = df.values.tolist()
o.write_table(tb_name, data_list)
print("结果存储完成")
三、 应用
1)场景1:对近两年内的所有供应链进行评估,甄别出高低风险供应链。
①UI 设计草图
image-20230216181750620.png②相关表设计
供应链统计总览表 |
---|
申报月份 |
经营单位编码 |
进出口岸代码 |
进出口标志 |
是否通过供应链安全评估标志 |
供应链数量 |
报关单数量 |
插入时间 |
供应链统计详情表 |
---|
申报月份 |
进出口标志 |
经营单位编号 |
经营单位 |
经营单位总分值 |
货主单位编号 |
货主单位 |
货主单位总分值 |
申报单位编号 |
申报单位 |
申报单位总分值 |
境外收(发)货人 |
境外收(发)货人总分值 |
综合风险分值 |
是否通过安全评估 |
报关单数量 |
涉及报关单号 |
插入时间 |
1)场景2: 对新申报供应链的企业进行安全评估。
业务输入:经营单位编号,商品编号,货主单位名单、申报单位名单、境外收(发)货人名单。(这部分由业务人员在前端输入)
返回结果:评估详情
网友评论