Jean's Blog

一个专注软件测试开发技术的个人博客

0%

业务场景

信用卡反欺诈

如何展开信用卡反欺诈

我们已经可以了解了信用卡反欺诈的运作原理了。只不过需要注意的是我们常用的算法都为监督学习算法。什么是监督学习呢,就是我们用来训练的数据必须有label(y值,标注,真实值),这样才可以指导损失函数和梯度下降算法进行迭代。 用通俗的语言说,我们需要从数据中学习到一些规律去判断用户的行为。 但用来学习的这些数据得有标准答案才行,这样我才能知道什么规律是正确的。

所以一个人工智能模型想要训练出来, 需要海量的已标注的数据(已经有答案的数据),而已标注的数据往往不是自动从系统中抓取出来的,而是由人工处理而成。

比如反欺诈系统中,往往需要业务专家人工的去排查每一条用户的数据并标记其label(是否是盗刷行为),经过这样大量的人工标注工作后,才可以收集到训练所需的海量数据。

或者换一个比较容易里理解的场景, 我们有一个图片分类的场景,可以识别人,猫,狗,老鼠。 为了训练出能够识别出这4种分类的模型,需要标注人员对海量的图片进行标注,这样算法才能学习到这4中类别的特征权重。网络上曾经有个段子,人工智能是有多少人工, 就有多少智能, 虽然是个段子,但其实也侧面说明了, 人工智能确实是建立在大量的人工基础上的场景。

建模流程

使用spark ml库写了一段代码来表示一个建模流程,我们就来模拟一下信用卡反欺诈模型是如何训练的. 这里我们的测试数据是我自己构建的。就不展示数据标注的工作了。

建模流程如下:

  1. 获取用户数据。
  2. 根据用户数据提取特征(拼表,计算时序特征等等),经过此步骤后一般会产出一张宽表。
  3. 当所有特征都计算好后,需要进行一定的特征转换,比如把离散特征进行独热编码,转换成计算机可以理解的形式。
  4. 将数据拆分为训练和验证两份数据,训练集交给算法进行建模。
  5. 模型训练好后,使用验证数据进行评估效果工作。
  6. 模型保存。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
# @Time:2025/7/18 11:01
# @Author:jinglv
"""
建模流程如下:

1. 获取用户数据。
2. 根据用户数据提取特征(拼表,计算时序特征等等),经过此步骤后一般会产出一张宽表。
3. 当所有特征都计算好后,需要进行一定的特征转换,比如把离散特征进行独热编码,转换成计算机可以理解的形式。
4. 将数据拆分为训练和验证两份数据,训练集交给算法进行建模。
5. 模型训练好后,使用验证数据进行评估效果工作。
6. 模型保存。
"""
from pyspark import SparkContext, SparkConf, SQLContext
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler
from pyspark.ml.tuning import ParamGridBuilder, TrainValidationSplit
from pyspark.sql import functions as F
from pyspark.sql.window import Window

# 初始化SparkContext
conf = SparkConf().setMaster("local").setAppName("Credit card anti-fraud scenario")
sc = SparkContext(conf=conf)
sqlContext = SQLContext(sc)

# -----------------------------获取用户数据--------------------------------
# 定义用户数据,姓名,性别,年龄,职位,交易额(消费金额),以及最后的label(代表是否是欺诈行为,1.0为欺诈,0.0相反)
dicts = [
['frank', '男', 16, '程序员', 3600, 1.0],
['alex', '女', 26, '项目经理', 3000, 1.0],
['frank', '男', 16, '程序员', 2600, 0.0],
['asdf', '男', 16, '程序员', 2600, 0.0],
['fragfsnk', '男', 16, '程序员', 2600, 0.0],
['frasdfgnk', '男', 16, '程序员', 2600, 0.0],
['frsdfgank', '男', 16, '程序员', 2600, 0.0],
['frsdfgdfank', '男', 16, '程序员', 2600, 0.0],
['frsdfgdfankdsaf', '男', 16, '程序员', 2600, 0.0],
['frsdfgdfank342', '男', 16, '程序员', 2600, 0.0],
['frsdfgdfank445', '男', 16, '程序员', 2600, 0.0],
['frsdfgdfank756', '男', 16, '程序员', 3600, 1.0],
['hdfg', '男', 16, '程序员', 2600, 0.0],
['frsdfncvgdfank', '男', 16, '程序员', 2600, 0.0],
['wert', '男', 16, '程序员', 2600, 0.0],
['sdfg', '男', 16, '程序员', 2600, 0.0],
['frssdffgdfank', '男', 16, '程序员', 2600, 0.0],
['asdf', '男', 16, '程序员', 2600, 0.0],
['zxcv', '男', 16, '程序员', 2600, 0.0],
['frsdfgdfank', '男', 16, '程序员', 2600, 0.0],
['vzxcv', '男', 16, '程序员', 2600, 0.0],
['zxcv', '男', 16, '程序员', 3600, 1.0],
['frsdfgdcvfank', '男', 16, '程序员', 3600, 1.0],
['frsdfgdcvfankasdf', '男', 16, '程序员', 3600, 1.0],
['asfghffgh', '男', 16, '程序员', 3600, 1.0],
['dfgh', '男', 16, '程序员', 3600, 1.0],
['frsdfgdcvbnmvbvfank', '男', 16, '程序员', 3600, 1.0],
['v', '男', 16, '程序员', 3600, 1.0],
['dasdfsadf', '男', 16, '程序员', 3600, 1.0],
['gghg', '男', 16, '程序员', 3600, 1.0],
]
# 创建RDD
rdd = sc.parallelize(dicts, 3)
# 假设用户数据有名字,性别,年龄,职位,本次交易额,以及最后的label(代表是否是欺诈行为,1.0为欺诈,0.0相反)
dataf = sqlContext.createDataFrame(rdd, ['name', 'gender', 'age', 'title', 'price', 'label'])

# -----------------------------计算时序特征--------------------------------
# 通过spark定义一段窗口,计算出用户一段时间内的最大消费额
windowSpec = Window.partitionBy(dataf.gender)
windowSpec = windowSpec.orderBy(dataf.age)
windowSpec = windowSpec.rowsBetween(Window.unboundedPreceding, Window.currentRow)
dataf.withColumn('max_price', F.max(dataf.price).over(windowSpec)).show()

# -----------------------------转换特征--------------------------------
# 将非数值类型的字段转换为数值类型
stringIndexer = StringIndexer(inputCol="title", outputCol="title_num")
data_indexed = stringIndexer.fit(dataf).transform(dataf)
# 将类别特征进行独热编码,这是为了把字符串类型的离散特征,转换成计算机可以识别的数字
encoder = OneHotEncoder(inputCol="title_num", outputCol="title_onehot")
data_encoded = encoder.fit(data_indexed).transform(data_indexed)

# 将所有特征组合成一个特征向量
vectorAssembler = VectorAssembler(inputCols=["age", "title_onehot", "price"], outputCol="feature")
data_vector = vectorAssembler.transform(data_encoded)

# -----------------------------模型训练--------------------------------
# 将数据划分为训练集和验证集,一部分数据用来训练,而一部分数据用来测试模型的效果(randomSplit拆分,80%的数据用来训练,20%的数据用来验证,seed随机种子)
(train_data, test_data) = data_vector.randomSplit([0.8, 0.2], seed=1234)

# ----------------------------模型调参和交叉验证--------------------------------
# 创建Logistic回归模型,这是最典型的机器学习算法
lr = LogisticRegression(labelCol="label", featuresCol="feature")

# 定义模型训练要用的参数
paramGrid = ParamGridBuilder().addGrid(lr.regParam, [0.01, 0.1, 1]).addGrid(lr.elasticNetParam, [0.0, 0.5, 1.0]).build()

# 定义评估指标
evaluator = BinaryClassificationEvaluator(labelCol="label", metricName="areaUnderROC")

# 使用训练集和验证进行模型调参和交叉验证
tvs = TrainValidationSplit(estimator=lr, estimatorParamMaps=paramGrid, evaluator=evaluator, trainRatio=0.8)
tvsModel = tvs.fit(train_data)

# 输出最佳参数(特征)组合
print("最佳参数组合: " + str(tvsModel.getEstimatorParamMaps()))

# 使用最佳参数组合训练模型
bestModel = tvsModel.bestModel

# ----------------------------模型预测--------------------------------

# 在测试集上评估模型效果,predictions中保存了每一条数据的预测结果
predictions = bestModel.transform(test_data)
result = evaluator.evaluate(predictions)
predictions.show()
print(result)

# ----------------------------模型保存--------------------------------
# 把模型保存到本地目录
tvsModel.write().overwrite().save('./model')

上面代码中predictions中保存了每一条数据的预测结果,如果我们打印它,会展示如下数据:

image-20250718111929289

上面的表格中prediction代表spark训练出的模型最终的识别结果,而probability则代表概率,其中第一条数据是0的概率是0.97498114032739。而最后我们把训练好的模型保存到本地中,以便以后我们后续使用。

推荐系统

推荐系统的问题

已经基本了解到了要如何构建一个二分类模型。我们都知道模型大体可以分成,回归,二分类和多分类。但推荐系统是属于哪一种场景呢,比如我们常见的广告推荐或者内容推荐,这些场景都是由系统来判断用户的喜好来推送广告或者视频内容,以追求更高的点击率和转化率。这种场景怎么看都不像跟这三种类型的算法有关系。

实现思路

其实解决这个问题的思路也比较简单, 我们可以遵循如下的原则:

  1. 借助专家系统,根据用户的信息初筛(定义些规则)一个候选的视频集合(比如1000个),比如可以先简单根据用户的年龄,性别,爱好,职业进行推测他喜欢的类型并过滤出候选集合。
  2. 训练一个二分类模型,这个模型用于推理出用户是否会点击这个视频。
  3. 将候选集合分别输入给模型进行推理。计算出每个视频会被用户点击的概率。
  4. 将最终的推理结果进行排序,取top n个概率最高的视频推送给用户。

这样我们就把一个推荐系统的问题转换成了一个二分类的问题。

代码实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
from pyspark import SparkContext, SparkConf, SQLContext
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.feature import Tokenizer, StopWordsRemover, CountVectorizer, StringIndexer, OneHotEncoder, \
VectorAssembler
from pyspark.sql import functions as F

conf = SparkConf().setMaster("local").setAppName("Recommendation system")
sc = SparkContext(conf=conf)
sqlContext = SQLContext(sc)

# 定义数据, 性别,电影名称,label(代表是否是感兴趣行为,1.0为感兴趣,0.0相反)
dicts = [
['man', 'The Shawshank Redemption', 1.0],
['man', 'The Godfather', 1.0],
['man', 'Forrest Gump', 1.0],
['woman', 'Titanic', 1.0],
['woman', 'Forrest Gump', 0.0],
['woman', 'The Godfather', 0.0],
['woman', 'The Shawshank Redemption', 0.0],
['man', 'Titanic', 0.0],
['man', 'A Beautiful Mind', 0.0],
['woman', 'A Beautiful Mind', 1.0],
]
rdd = sc.parallelize(dicts, 3)
dataf = sqlContext.createDataFrame(rdd, ['gender', 'title', 'interested'])

# 将性别进行独热编码,以便把数据转换成算法可以识别的形式(根据性别进行独热编码)
stringIndexer = StringIndexer(inputCol="gender", outputCol="gender_num")
data_indexed = stringIndexer.fit(dataf).transform(dataf)
encoder = OneHotEncoder(inputCol="gender_num", outputCol="gender_onehot")
data_encoded = encoder.fit(data_indexed).transform(data_indexed)
data = data_encoded.select('gender_onehot', 'interested', 'title')
data.show()

# 使用分词器(根据电影名字进行分词)
tokenizer = Tokenizer(inputCol="title", outputCol="words")
# 使用停用词(去掉停用词,不需要的词汇,没有意义的词,比如:标点符号、哦、哈的语气词,介词等,一般会有停用词库)
remover = StopWordsRemover(inputCol="words", outputCol="filtered_words")
# 将文本数据转换成词向量
vectorizer = CountVectorizer(inputCol="filtered_words", outputCol="final_words")

# 将所有特征组合成一个特征向量
vectorAssembler = VectorAssembler(inputCols=["gender_onehot", "final_words"], outputCol="features")

# 定义逻辑回归
classifier = LogisticRegression(labelCol="interested", featuresCol="features", maxIter=10)
# 定义流水线, 当数据来了以后就可以按顺序处理数据
pipeline = Pipeline(stages=[tokenizer, remover, vectorizer, vectorAssembler, classifier])
# 模型训练
model = pipeline.fit(data)

# 模型推理
predictions = model.transform(data)
evaluator = BinaryClassificationEvaluator(labelCol="interested", rawPredictionCol="rawPrediction")
accuracy = evaluator.evaluate(predictions)
print("Accuracy:", accuracy)
predictions.show()

df_desc = predictions.orderBy(F.desc("probability"))
df_desc.show()

词向量

上面用于训练模型的数据中有一列是电影名称,我们会发现代码中我们使用了一系列NLP(Natural Language Processing,自然语言处理)的算法:

  • 分词器(tokenizer):用于在一个句子中提取一个一个的词。

  • 停用词(stop words):用于去掉一些语义无关的语气词,介词等,比如the或者中文中的语气词。 在模型训练中往往需要去掉这些词以去除噪音,优化模型空间,减少索引量等等。

  • 词向量(也叫词嵌入):可以理解为计算出词与词之间的关联性,从而训练出的围绕中心词的特征向量。

上述概念中可能词向量是最难以理解的,这里尽量尝试用简单易懂的语言来解释这个概念。 我们之前训练反欺诈模型的时候,也遇到过一些离散特征,比如title也是以文本形式存在的数据。

我们在反欺诈中处理这样的使用的one-hot(独热编码),独热编码也是一种处理离散特征常用的方法。假设我们有一群学生,他们可以通过三个特征来形容,分别是:

  • 性别:[“男”,“女”]

  • 年级:[“初一”,“初二”,“初三”]

  • 学校:[“一中”,“二中”,“三中”,“四中”]

我们用采用N位状态寄存器来对N个状态进行编码,拿上面的例子来说,就是:

性别 [“男”, “女”] N=2 男: 1 0 女: 0 1
年级 [“初一”, “初二”, “初三”] N=3 初一: 1 0 0 初二: 0 1 0 初三: 0 0 1
学校 [“一中”, “二中”, “三中”, “四中”] N=4 一中: 1 0 0 0 二中: 0 1 0 0 三中: 0 0 1 0 四中: 0 0 0 1

因此,当我们再来描述一个学生的时候(男生,初一,来自一中),就可以采用 [1 0 1 0 0 1 0 0 0]这样的形式来表示。这也一种用于特征组合(特征之间是有关系的)的实现方法之一。

或者我们也可以使用类似bitmap的方法做出一个one—hot向量来表示离散特征。 我们可以用类似下面的形式表达:

image-20250718151944852

假设职业这一列一共有100个值, 假设教师在编号6这个位置上,编号6所在位置ide值就是1,其他的值都是0,我们以这个向量来代表教师这个特征. 以此类推,如果学生代表的编号是10,那么10这个位置所在的值是1,其他位置的值都是0,用词向量来代表学生。 这样最后我们就有100个100维度的向量来表示这些特征(特征之间是没有关系)。

上面两种方法都是很常见的用来用来表达文本特征的方法,但它们的问题是词与词之间是独立的,互相没有关联。 比如我们的训练数据中有一个句子this is apple juice,我们期望当出现 this is orange __ 的时候,模型能够为我们推测出这个空白处也应该填写单词juice。 也就是我们希望模型能通过之前针对第一个句子的训练就能找到单词与单词之间的关系,模型能够知道appleorange是含义相似的词,从而能推测出orange后面也可以填写juice。 而这正是词向量要做的事情(词与词之间的关系)。

男人 女人 国王 王后 苹果 橘子
性别 -1 1 -0.95 0.97 0 0.01
食物 -0.01 0.01 -0.02 0.01 0.95 0.97
高贵程度 0.01 0.01 0.9 0.87 0.01 0

词向量可找到现有的,也可以自己训练。

如上图,词向量围绕这一些中心词(性别,事务,高贵程度),计算出每一个词与这些中心词的相关程度。而要得到这个词向量本身就需要相关算法训练出来,比如world2vec(词向量的一种算法):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
# @Time:2025/7/18 15:29
# @Author:jinglv
from pyspark.ml.feature import Word2Vec

from pyspark.sql import SparkSession

# 初始化SparkSession
spark = SparkSession \
.builder \
.appName("dataFrame") \
.getOrCreate()

# 文本数据,进行切词
documentDF = spark.createDataFrame([
("Hi I heard about Spark".split(" "),),
("I wish Java could use case classes".split(" "),),
("Logistic regression models are neat".split(" "),)
], ["text"])

documentDF.show()

# Learn a mapping from words to Vectors.进行词向量映射训练,训练出5个中心词
word2Vec = Word2Vec(vectorSize=5, minCount=0, inputCol="text", outputCol="result")
model = word2Vec.fit(documentDF)

result = model.transform(documentDF)
for row in result.collect():
text, vector = row
print("Text: [%s] => \nVector: %s\n" % (", ".join(text), str(vector)))

执行结果:

1
2
3
4
5
6
7
8
Text: [Hi, I, heard, about, Spark] => 
Vector: [-0.044043296948075294,0.0017202366143465042,-0.04031781677622348,-0.030211083777248862,-0.04166501020081342]

Text: [I, wish, Java, could, use, case, classes] =>
Vector: [-0.01309338877243655,0.027492987191570655,0.028597419682357992,-0.02514699660241604,0.007802080828696489]

Text: [Logistic, regression, models, are, neat] =>
Vector: [0.01408876907080412,-0.03434107899665833,-0.011755409836769105,-0.012410750263370574,0.009409739542752505]

上面是一个训练词向量的代码,它的计算原理大概可以描述为:在文本中选取中心词并选取中心词前后数个单词,并训练出这些词会出现在中心词周围的概率。