专家系统与机器学习的概念
什么是人工智能
现阶段主流实现人工智能的手段是:大数据 + 机器学习。但实现人工智能的方法并不仅局限于此,并且机器学习也仅仅是目前主流方法中的一个子集而已。

深度学习算是机器学习的一个特例,主要运用在计算机视觉中,具体会在计算机视觉中详细讲解。
之所以说这是现阶段对人工智能的定义, 因为人工智能还是在迅猛发展的,上个世纪 70 年代的时候人们对人工智能的定义还是专家系统。实现OCR的方法也可以是纯粹的的图像解析方案,所以随着科技的进步,未来人工智能的形态会是什么样子可能还是未知的。
什么是专家系统
信用卡反欺诈系统说起(上个世纪实现的人工智能的系统)
在银行中有一群业务专家, 他们的工作就是根据自己的知识和经验向系统中输入一些规则。
例如:某一张卡在一个城市有了一笔交易,之后1小时内在另一个城市又有了一笔交易。这些专家根据以前的经验判断这种情况是有盗刷的风险的,因为在短时间内人是很难跨越城市的,所以业务人员判断当出现这种情况时很可能是另一个人在盗刷用户的信用卡。
类似的业务专家们在系统中输入了1千多条这样的规则,组成了一个专家系统。这个专家系统是建立在人类对过往的数据所总结出的经验下建立的。
什么是机器学习
我们可以把专家系统看成一个大脑(知识和经验),业务是受这个大脑控制的。但这个大脑(总结的能力)是有极限的,我们要知道这种规则从 0 条建立到 1 条是很容易的,但是从几千条扩展到几千零一条是很难的。 因为你要保证新的规则有效,要保证它不会跟之前所有的规则冲突,所这很难,因为人的分析能力毕竟是有限的。
引入机器学习,给机器学算法中灌入大量的历史数据进行训练,它跟人类行为很像的一点就是它可以从历史数据中找到规律,而且它的分析能力更强。
在信用卡反欺诈的例子里,一个小时之内的跨城市交易记录是一个规则,但如果机器学习来做的话,它可能划分的更细,例如 10 分钟之内有个权重,10 分钟到 20 分钟有个权重,1 小时 10 分钟也有个权重。 也就是说它把这些时间段拆的更细。
并且相关的算法可以自动帮助人类去探索更加有用的规则,以及规则与规则之间联系。例如虽然我是 1 小时内的交易记录跨越城市了,但是我再哪个城市发生了这类情况会有不同的权重,发生的时间有不同的权重,交易数额也有不同的权重。也就是说机器学习能帮助我们找个更多更细隐藏的更深的规则。 以前银行的专家系统有 1000 多条规则,引入了机器学习后我们从10亿个规则中挑选了8000W条有效规则。我们可以把专家系统看成是一个比较小的大脑,而机器学习是更大的大脑。 所以说我们叫它机器学习,是因为它像人类一样可以从历史中库刻画出规律,只不过它比人类的分析能力更强。
特征概念:离散与连续
什么是特征
通过前面的介绍,我们知道机器学习就是一个更加强大的大脑,一个更加强大的专家系统。
在这个系统中存在大规模的规则来满足用户的需要。而在人工智能领域中这些规则中的主体就叫做特征,这些特征构成了人工智能最主要的组成部分。
比如还是信用卡反欺诈这个场景,交易城市是一个特征,交易金额是一个特征,交易时间还是一个特征,人工智能的流程就是用户需要筛选可能会对识别结果产生重要影响的特征,然后机器学习则会按照算法去训练出每个特征所占的权重。
离散特征与连续特性
如下图数据所示,观察数据的特点

- 离散特征:职业,爱好,工种等可枚举的字段称为离散特征,如上图中title通常会被当场一个离散特征处理。
- 连续特征:薪水,房价等这些不可枚举的具体的数字的字段称为连续特征,如上图中的薪水通常会被当成是连续特征处理。
离散特征与连续特征的区别
离散特征与连续特征最大的区别在于算法在计算对应字段的时候如何处理。
- 离散特征来说算法会将每一个唯一的值作为一个独立的特征,比如我们有用户表, 其中职业这个字段中我们有100种职业,也就是说这张表不论有多少行,职业这个字段的取值就是这100个职业中的一个。而对于算法来说每一个职业就是一个独立的特征。 老师是一个特征,学生是一个特征,程序员、环卫工人、产品经理、项目经理等等这些都是独立的特征。 也就是说对于这张表,不论有多少行,算法针对职业这一字段提取出来的特征就是固定的这100个。
- 连续特征,它是一个数字的数字,算法会把它当成一个独立的特征,也就是当算法把一个字段提取成为连续特征后,不管这张表有多少行,这个字段固定就只有1个特征被提出来。只不过这个特征值是会变动的。
但是一个字段是否是连续特征或者离散特征并不是固定的,职业这样的字段可以被算法进行连续化处理,像薪水这样的字段也可以被算法进行离散化处理。并不是一成不变的。
比如在spark ml(机器学习的包)中可以将连续特征进行二值化或者分桶来达到转换成离散特征的目的:
1 | # 连续特征二值化 |
执行的结果
1 | +------+--------+ |
从结果来看,小于2.4处理为0,大于2.4处理为1
1 | # 连续特征分桶离散化 |
执行的结果
1 | |features|bucketedFeatures| |
注意:学习Spark相关知识
时序特征
普通的特征都是在一行内进行提取,而在真实的场景中,仅仅是这样单行的特征是无法满足业务的需要的。
在建模过程中,用户往往需要统计更加复杂的特征。比如建模工程师会觉得用户在过去一周内的单笔最大消费额是非常有用的特征,而这样的特征明显是需要从很多行的数据中进行计算。 而这样的特征一般被称作多行特征或者时序特征,因为这类特征往往都是在计算一段时间内的数据特性,所以往往被叫做时序特征。在一些支持sql的计算框架和数据库中,窗口函数往往会被用来提取时序特征。
比如在spark中可以这样编写代码:
1 | from pyspark import SparkContext, SparkConf, SQLContext |
执行的结果
1 | +-----+------+---+--------+-----+---------+ |
- partitionBy: 根据哪个字段分组
- orderBy: 根据哪个字段进行排序
- rowsBetween: 取哪些行进行计算。unboundedPreceding代表当前行之前的无限的行数,currentRow代表当前行。例如,要选择当前行的前一行和后一行。可以写为rowsBetween(-1, 1)
模型:特征与权重的数据库
什么是模型
模型是什么呢?简单来说我们可以把模型看做是一个数据库, 它负责存储特征以及对应的权重。通俗的来说,就是经验、规律,在历史的经验中找规律,总结出文件,文件就是模型。
我们知道机器学习其实简单来说就是无数的if else的组合,是一个极其庞大的专家系统。 这个系统里包括了非常多的规则来帮助用户来完成业务目标。之前也讲到了特征是如何进行抽取的,并且强调了特征是人工智能中重要的组成部分。所以模型要负责存储这些特征以及对应的权重,在使用这个模型的时候 ,它的行为已点类似下面的伪代码:
实际上在业务中,当用户数据被传输到人工智能系统中,程序会依照预先设定好的规则提取特征。然后到模型这个特征数据库中查询是否存在这条特征并取出这条特征的权重。
比如一条用户数据到来后,系统根据职业这个字段提取出了程序员这个特征。 然后就会到模型中查询程序员这个特征是否存在并且取出它对应的权重。类似这样的,系统会在用户数据中依次提取特征并到模型中进行查询。这样上面的伪代码就成了:
假如上面的伪代码的计算结果为0.8。那么这个值就是模型最终的计算结果了。 如果放在信用卡反欺诈场景中,这个识别结果代表了当前的交易记录是信用卡盗刷行为的概率是80%。是的,在信用卡反欺诈中人工智能模型所要计算的,正是用户行为属于盗刷行为的概率是多少。
以下一个实例进行说明:
根据数据来训练一个模型
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.linalg import Vectors
from pyspark.ml.tuning import ParamGridBuilder, TrainValidationSplit
from pyspark.sql import SparkSession
# 1. 初始化 SparkSession
spark = SparkSession.builder.appName("DataEncodedExample").getOrCreate()
# 2. 定义数据
# 假设 title_onehot 是一个长度为 3 的向量,代表了不同的职称类别
# 例如:[1.0, 0.0, 0.0] 代表 '工程师'
# [0.0, 1.0, 0.0] 代表 '经理'
# [0.0, 0.0, 1.0] 代表 '总监'
data = [
(30, Vectors.dense([1.0, 0.0, 0.0]), 50000.0, 0), # 年龄, title_onehot, 价格, 标签
(45, Vectors.dense([0.0, 1.0, 0.0]), 80000.0, 1),
(25, Vectors.dense([1.0, 0.0, 0.0]), 45000.0, 0),
(55, Vectors.dense([0.0, 0.0, 1.0]), 120000.0, 1),
(38, Vectors.dense([0.0, 1.0, 0.0]), 70000.0, 0),
(29, Vectors.dense([1.0, 0.0, 0.0]), 52000.0, 1),
(42, Vectors.dense([0.0, 1.0, 0.0]), 90000.0, 1),
(33, Vectors.dense([1.0, 0.0, 0.0]), 60000.0, 0)
]
# 3. 定义 Schema
schema = ["age", "title_onehot", "price", "label"]
# 4. 创建 PySpark DataFrame
data_encoded = spark.createDataFrame(data, schema=schema)
# 将所有特征组合成一个特征向量,转为计算机理解的行为
vectorAssembler = VectorAssembler(inputCols=["age", "title_onehot", "price"], outputCol="feature")
data_vector = vectorAssembler.transform(data_encoded)
# 将数据划分为训练集和验证集,一部分数据用来训练,而一部分数据用来测试模型的效果
(train_data, test_data) = data_vector.randomSplit([0.8, 0.2], seed=1234)
# 创建Logistic回归模型
lr = LogisticRegression(labelCol="label", featuresCol="feature")
# 定义参数网格
paramGrid = ParamGridBuilder().addGrid(lr.regParam, [0.01, 0.1, 1]).addGrid(lr.elasticNetParam, [0.0, 0.5, 1.0]).build()
# 定义评估指标
evaluator = BinaryClassificationEvaluator(labelCol="label", metricName="areaUnderROC")
# 使用训练集和测试集进行模型调参和交叉验证
tvs = TrainValidationSplit(estimator=lr, estimatorParamMaps=paramGrid, evaluator=evaluator, trainRatio=0.8)
tvsModel = tvs.fit(train_data)
tvsModel.write().overwrite().save('./model')上面的代码中我们通过spark ml 初始化了一个逻辑回归算法(机器学习算法中的一种),经过模型训练后,我们可以把模型保存到本地的model目录下

通过spark的API读取保存下来的模型summary,查看都保存了哪些内容
1
2
3
4
5
6
7
8from pyspark import SparkContext, SparkConf, SQLContext
conf = SparkConf().setMaster("local").setAppName("My App")
sc = SparkContext(conf=conf)
sqlContext = SQLContext(sc)
dataA = sqlContext.read.parquet("/Users/jinglv/PycharmProjects/learn-machine/spark_ml/model/bestModel/data/part-00000-a8f00af6-d413-406f-a917-8d4f98de960a-c000.snappy.parquet")
dataA.show(truncate=False)执行结果

numClasses: 代表分类数量,我们训练的是一个二分类模型,所以这里的数字是2(是欺诈行为,非欺诈行为)。
numFeatures:代表特征数量, 从上面的代码可以知道,我们提取了3个特征(”age”, “title_onehot”, “price”)。
coefficientMatrix:系数矩阵,系数矩阵是一个包含每个特征与目标变量之间关系的矩阵(可以理解为权重)。
interceptVector:截距向量,截距向量是一个包含每个目标变量的截距值的向量。
读取模型并用于模型推理(预测)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40from pyspark.ml.feature import VectorAssembler
from pyspark.ml.linalg import Vectors
from pyspark.ml.tuning import TrainValidationSplitModel
from pyspark.sql import SparkSession
# 1. 初始化 SparkSession
spark = SparkSession.builder.appName("DataEncodedExample").getOrCreate()
# 2. 定义数据
# 假设 title_onehot 是一个长度为 3 的向量,代表了不同的职称类别
# 例如:[1.0, 0.0, 0.0] 代表 '工程师'
# [0.0, 1.0, 0.0] 代表 '经理'
# [0.0, 0.0, 1.0] 代表 '总监'
data = [
(30, Vectors.dense([1.0, 0.0, 0.0]), 50000.0, 0), # 年龄, title_onehot, 价格, 标签
(45, Vectors.dense([0.0, 1.0, 0.0]), 80000.0, 1),
(25, Vectors.dense([1.0, 0.0, 0.0]), 45000.0, 0),
(55, Vectors.dense([0.0, 0.0, 1.0]), 120000.0, 1),
(38, Vectors.dense([0.0, 1.0, 0.0]), 70000.0, 0),
(29, Vectors.dense([1.0, 0.0, 0.0]), 52000.0, 1),
(42, Vectors.dense([0.0, 1.0, 0.0]), 90000.0, 1),
(33, Vectors.dense([1.0, 0.0, 0.0]), 60000.0, 0)
]
# 3. 定义 Schema
schema = ["age", "title_onehot", "price", "label"]
# 4. 创建 PySpark DataFrame
data_encoded = spark.createDataFrame(data, schema=schema)
# 将所有特征组合成一个特征向量
vectorAssembler = VectorAssembler(inputCols=["age", "title_onehot", "price"], outputCol="feature")
data_vector = vectorAssembler.transform(data_encoded)
# 从模型文件中加载模型,
model = TrainValidationSplitModel.load('./model')
# 推理出预测的值(0 非欺诈行为 1 欺诈行为)
predictions = model.transform(data_vector)
predictions.show()执行结果

上面代码中, 我们通过
model.transform把数据输入给模型后,返回的predictions就是模型的推理结果了。