任务介绍
英雄联盟(League of Legends,LoL)是一个多人在线竞技游戏,由拳头游戏(Riot Games)公司出品。在游戏中,每位玩家控制一位有独特技能的英雄,红蓝两支队伍各有五位玩家进行对战,目标是摧毁对方的基地水晶。水晶有多座防御塔保护,通常需要先摧毁一些防御塔再摧毁水晶。玩家所控制的英雄起初非常弱,需要不断击杀小兵、野怪和对方英雄来获得金币、经验。经验可以提升英雄等级和技能等级,金币可以用来购买装备提升攻击、防御等属性。对战过程中一般没有己方单位在附近的地点是没有视野的,即无法看到对面单位,双方可以通过使用守卫来监视某个地点,洞察对面走向、制定战术。
本数据集来自Kaggle,包含了9879场钻一到大师段位的单双排对局,对局双方几乎是同一水平。每条数据是前10分钟的对局情况,每支队伍有19个特征,红蓝双方共38个特征。这些特征包括英雄击杀、死亡,金钱、经验、等级情况等等。一局游戏一般会持续30至40分钟,但是实际前10分钟的局面很大程度上影响了之后胜负的走向。作为最成功的电子竞技游戏之一,对局数据、选手数据的量化与研究具有重要意义,可以启发游戏将来的发展和改进。
Powered by a@antior.cn
本任务是希望同学们依据注释的要求,对代码中空缺部分进行填写,完成决策树模型的详细实现,根据已有的对局前10分钟特征信息,预测最后获胜方是蓝色方还是红色方,了解执行一个机器学习任务的大致流程,并提交代码和实验报告。第一次作业也是一个机器学习小实验的例子,之后的作业可能不再提供预处理等流程代码,由同学们自己设计实验完成代码编写。
导入工具包
pandas是数据分析和处理常用的工具包,非常适合处理行列表格数据。numpy是数学运算工具包,支持高效的矩阵、向量运算。sklearn是机器学习常用工具包,包括了一些已经实现好的简单模型和一些常用数据处理方法、评价指标等函数。
from collections import Counterimport pandas as pd # 数据处理import numpy as np # 数学运算from sklearn.model_selection import train_test_split, cross_validate # 划分数据集函数from sklearn.metrics import accuracy_score # 准确率函数RANDOM_SEED = 2020 # 固定随机种子
读入数据
假设数据文件放在./data/目录下,标准的csv文件可以用pandas里的read_csv()函数直接读入。文件共有40列,38个特征(红蓝方各19),1个标签列(blueWins),和一个对局标号(gameId)。对局标号不是标签也不是特征,可以舍去。
csv_data = './data/high_diamond_ranked_10min.csv' # 数据路径data_df = pd.read_csv(csv_data, sep=',') # 读入csv文件为pandas的DataFramedata_df = data_df.drop(columns='gameId') # 舍去对局标号列
数据概览
对于一个机器学习问题,在拿到任务和数据后,首先需要观察数据的情况,比如我们可以通过.iloc[0]取出数据的第一行并输出。不难看出每个特征都存成了float64浮点数,该对局蓝色方开局10分钟有小优势。同时也可以发现有些特征列是重复冗余的,比如blueGoldDiff表示蓝色队金币优势,redGoldDiff表示红色方金币优势,这两个特征是完全对称的互为相反数。blueCSPerMin是蓝色方每分钟击杀小兵数,它乘10就是10分钟所有小兵击杀数blueTotalMinionsKilled。在之后的特征处理过程中可以考虑去除这些冗余特征。
另外,pandas有非常方便的describe()函数,可以直接通过DataFrame进行调用,可以展示每一列数据的一些统计信息,对数据分布情况有大致了解,比如blueKills蓝色方击杀英雄数在前十分钟的平均数是6.14、方差为2.93,中位数是6,百分之五十以上的对局中该特征在4-8之间,等等。
print(data_df.iloc[0]) # 输出第一行数据data_df.describe() # 每列特征的简单统计信息
增删特征
传统的机器学习模型大部分都是基于特征的,因此特征工程是机器学习中非常重要的一步。有时构造一个好的特征比改进一个模型带来的提升更大。这里简单展示一些特征处理的例子。首先,上面提到,特征列中有些特征信息是完全冗余的,会给模型带来不必要的计算量,可以去除。其次,相比于红蓝双方击杀、助攻的绝对值,可能双方击杀英雄的差值更能体现出当前对战的局势。因此,我们可以构造红蓝双方对应特征的差值。数据文件中已有的差值是金币差GoldDiff和经验差ExperienceDiff,实际上每个对应特征都可以构造这样的差值特征。
drop_features = ['blueGoldDiff', 'redGoldDiff','blueExperienceDiff', 'redExperienceDiff','blueCSPerMin', 'redCSPerMin','blueGoldPerMin', 'redGoldPerMin'] # 需要舍去的特征列df = data_df.drop(columns=drop_features) # 舍去特征列info_names = [c[3:] for c in df.columns if c.startswith('red')] # 取出要作差值的特征名字(除去red前缀)drop_features = []for info in info_names: # 对于每个特征名字df['br' + info] = df['blue' + info] - df['red' + info] # 构造一个新的特征,由蓝色特征减去红色特征,前缀为brdrop_features.append('red' + info)# 其中FirstBlood为首次击杀最多有一只队伍能获得,brFirstBlood=1为蓝,0为没有产生,-1为红drop_features.extend(['blueFirstBlood','redFirstBlood'])#print(drop_features)for c in drop_features:print(c)try:df.drop(columns=c, inplace=True)except:continue#df = df.drop(columns=drop_features_red)#df = df.drop(columns=['blueFirstBlood', 'redFirstBlood']) # 原有的FirstBlood可删除
重新打印数据
print(df.iloc[0]) # 输出第一行数据df.describe() # 每列特征的简单统计信息
特征离散化
决策树ID3算法一般是基于离散特征的,本例中存在很多连续的数值特征,例如队伍金币。直接应用该算法每个值当作一个该特征的一个取值可能造成严重的过拟合,因此需要对特征进行离散化,即将一定范围内的值映射成一个值,例如对用户年龄特征,将0-10映射到0,11-18映射到1,19-25映射到2,25-30映射到3,等等类似,然后在决策树构建时使用映射后的值计算信息增益。
本小节要求实现特征离散化,请补全相关代码
discrete_df = df.copy() # 先复制一份数据def get_discrete_conf(df):Sheet_discrete_list = []for c in df.columns: # 遍历每一列特征,跳过标签列(label)if c == 'blueWins':continueSheet_discrete_list.append([c, get_discrete_conf_c(df, c)])Sheet_discrete_dict = dict(Sheet_discrete_list)print(Sheet_discrete_dict)return Sheet_discrete_dictdef get_discrete_conf_c(df, c, field_num = 5):min_ = min(df[c])max_ = max(df[c])field = []action = 1normal = 0if len(set(list(df[c])))<=5:action = 0discrete_conf = {"action" : action,"normal" : normal,"min" : min_,"max" : max_,#"type" : type_,"field" : field,}return discrete_conffield = [-500, min(min(df[c]), df.quantile(.25)[c]), max(min(df[c]), df.quantile(.25)[c]), df.quantile(.5)[c], df.quantile(.75)[c]]discrete_conf = {"action" : action,"normal" : normal,"min" : min_,"max" : max_,#"type" : type_,"field" : field,}return discrete_confdef get_discrete_column(discrete_conf, c, df_c):if discrete_conf[c]['action'] == 0:return df_cvalues = discrete_conf[c]["field"]for i in range(len(df_c)):for j in values:if df_c[i]>=j:df_c[i] = jreturn df_cdef get_discrete_data(discrete_conf, df):for c in df.columns:if c not in discrete_conf:continuediscrete_column = get_discrete_column(discrete_conf, c, list(df[c]))df[c] = discrete_columnreturn dfdiscrete_conf = get_discrete_conf(df)discrete_df = get_discrete_data(discrete_conf, df)print(discrete_df)#discrete_df.drop(labels='blueWins',axis=1)'''for c in df.columns[1:]: # 遍历每一列特征,跳过标签列(label)print(c)print(Sheet_discrete_dict[c])for index, row in df.iterrows():#print(index)#print(row[c])tmp = get_discrete_value(Sheet_discrete_dict[c], row[c])#print(tmp)discrete_df[c][index] = tmpdiscrete_df.to_csv('data.csv')'''
数据集准备
构建机器学习模型前要构建训练和测试的数据集。在本例中首先需要分开标签和特征,标签是不能作为模型的输入特征的,就好比作业和试卷答案不能在做题和考试前就告诉学生。测试一个模型在一个任务上的效果至少需要训练集和测试集,训练集用来训练模型的参数,好比学生做作业获得知识,测试集用来测试模型效果,好比期末考试考察学生学习情况。测试集的样本不应该出现在训练集中,否则会造成模型效果估计偏高,好比考试时出的题如果是作业题中出现过的,会造成考试分数不能准确衡量学生的学习情况,估计值偏高。划分训练集和测试集有多种方法,下面首先介绍的是随机取一部分如20%作测试集,剩下作训练集。sklearn提供了相关工具函数train_test_split。sklearn的输入输出一般为numpy的array矩阵,需要先将pandas的DataFrame取出为numpy的array矩阵。
all_y = discrete_df['blueWins'].values # 所有标签数据feature_names = discrete_df.columns[1:] # 所有特征的名称print("feature_names:",feature_names)print("feature_len:",len(feature_names))all_x = discrete_df[feature_names].values # 所有原始特征值,pandas的DataFrame.values取出为numpy的array矩阵# 划分训练集和测试集x_train, x_test, y_train, y_test = train_test_split(all_x, all_y, test_size=0.2, random_state=RANDOM_SEED)all_y.shape, all_x.shape, x_train.shape, x_test.shape, y_train.shape, y_test.shape # 输出数据行列信息
决策树模型的实现
本小节要求实现决策树模型,请补全算法代码
# 定义决策树类from math import logimport operatordef majorityCnt(classList):classCount = {}for vote in classList: #统计classList中每个元素出现的次数if vote not in classCount.keys():classCount[vote] = 0classCount[vote] += 1sortedClassCount = sorted(classCount.items(), key = operator.itemgetter(1), reverse = True) #根据字典的值降序排序return sortedClassCount[0][0] #返回classList中出现次数最多的元素def splitDataSet(dataSet, axis, value):retDataSet = [] #创建返回的数据集列表for featVec in dataSet: #遍历数据集if featVec[axis] == value:reducedFeatVec = featVec[:axis] #去掉axis特征reducedFeatVec.extend(featVec[axis+1:]) #将符合条件的添加到返回的数据集retDataSet.append(reducedFeatVec)return retDataSet #返回划分后的数据集def calcShannonEnt(dataSet):numEntires = len(dataSet) #返回数据集的行数labelCounts = {} #保存每个标签(Label)出现次数的字典for featVec in dataSet: #对每组特征向量进行统计currentLabel = featVec[-1] #提取标签(Label)信息if currentLabel not in labelCounts.keys(): #如果标签(Label)没有放入统计次数的字典,添加进去labelCounts[currentLabel] = 0labelCounts[currentLabel] += 1 #Label计数shannonEnt = 0.0 #经验熵(香农熵)for key in labelCounts: #计算香农熵prob = float(labelCounts[key]) / numEntires #选择该标签(Label)的概率shannonEnt -= prob * log(prob, 2) #利用公式计算return shannonEnt #返回经验熵(香农熵)def chooseBestFeatureToSplit(dataSet):numFeatures = len(dataSet[0]) - 1 #特征数量baseEntropy = calcShannonEnt(dataSet) #计算数据集的香农熵bestInfoGain = 0.0 #信息增益bestFeature = -1 #最优特征的索引值for i in range(numFeatures): #遍历所有特征#获取dataSet的第i个所有特征featList = [example[i] for example in dataSet]uniqueVals = set(featList) #创建set集合{},元素不可重复newEntropy = 0.0 #经验条件熵for value in uniqueVals: #计算信息增益subDataSet = splitDataSet(dataSet, i, value) #subDataSet划分后的子集#print(subDataSet)prob = len(subDataSet) / float(len(dataSet)) #计算子集的概率newEntropy += prob * calcShannonEnt(subDataSet) #根据公式计算经验条件熵infoGain = baseEntropy - newEntropy #信息增益#print("第%d个特征的增益为%.3f" % (i, infoGain)) #打印每个特征的信息增益if (infoGain > bestInfoGain): #计算信息增益bestInfoGain = infoGain #更新信息增益,找到最大的信息增益bestFeature = i #记录信息增益最大的特征的索引值return bestFeature #返回信息增益最大的特征的索引值def traverse_node(inputTree, featLabels, testVec):firstStr = next(iter(inputTree)) #获取决策树结点secondDict = inputTree[firstStr] #下一个字典featIndex = featLabels.index(firstStr)for key in secondDict.keys():if testVec[featIndex] == key:if type(secondDict[key]).__name__ == 'dict':classLabel = traverse_node(secondDict[key], featLabels, testVec)else:classLabel = secondDict[key]return classLabelclass DecisionTree(object):def __init__(self, classes, features,max_depth=10, min_samples_split=10,impurity_t='entropy'):'''传入一些可能用到的模型参数,也可能不会用到classes表示模型分类总共有几类features是每个特征的名字,也方便查询总的共特征数max_depth表示构建决策树时的最大深度min_samples_split表示构建决策树分裂节点时,如果到达该节点的样本数小于该值则不再分裂impurity_t表示计算混杂度(不纯度)的计算方式,例如entropy或gini'''self.classes = classesself.features = featuresself.max_depth = max_depthself.min_samples_split = min_samples_splitself.impurity_t = impurity_tself.root = None # 定义根节点,未训练时为空'''请实现决策树算法,使得fit函数和predict函数可以正常调用,跑通之后的测试代码,要求之后测试代码输出的准确率大于0.6。提示:可以定义额外一些函数,例如impurity()用来计算混杂度gain()调用impurity用来计算信息增益expand_node()训练时递归函数分裂节点,考虑不同情况1. 无需分裂 或 达到分裂阈值2. 调用gain()找到最佳分裂特征,递归调用expand_node3. 找不到有用的分裂特征fit函数调用该函数返回根节点traverse_node()预测时遍历节点,考虑不同情况1. 已经到达叶节点,则返回分类结果2. 该特征取值在训练集中未出现过3. 依据特征取值进入相应子节点,递归调用traverse_node当然也可以有其他实现方式。'''def fit(self, dataSet, labels, featLabels = []):print(len(labels))print(len(dataSet[0]))print(dataSet[0])assert len(labels) == (len(dataSet[0])-1) # 输入数据的特征数目应该和模型定义时的特征数目相同classList = [example[-1] for example in dataSet]if len(dataSet[0]) <= self.min_samples_split: #遍历完所有特征时返回出现次数最多的类标签return majorityCnt(classList)if (len(self.features)-len(labels))>=self.max_depth-2:return majorityCnt(classList)if classList.count(classList[0]) == len(classList): #如果类别完全相同则停止继续划分return classList[0]if len(dataSet[0]) == 1: #遍历完所有特征时返回出现次数最多的类标签return majorityCnt(classList)bestFeat = chooseBestFeatureToSplit(dataSet) #选择最优特征bestFeatLabel = labels[bestFeat] #最优特征的标签featLabels.append(bestFeatLabel)myTree = {bestFeatLabel:{}} #根据最优特征的标签生成树del(labels[bestFeat]) #删除已经使用特征标签featValues = [example[bestFeat] for example in dataSet] #得到训练集中所有最优特征的属性值uniqueVals = set(featValues) #去掉重复的属性值for value in uniqueVals:subLabels=labels[:]#递归调用函数createTree(),遍历特征,创建决策树。myTree[bestFeatLabel][value] = DT.fit(splitDataSet(dataSet, bestFeat, value), subLabels, featLabels)return myTreedef predict(self, inputTree, features, dataSet):assert len(dataSet.shape) == 1 or len(dataSet.shape) == 2 # 只能是1维或2维if len(dataSet.shape) == 1:return traverse_node(inputTree, features, dataSet)else:return np.array([traverse_node(inputTree, features, f) for f in dataSet])'''预测输入feature可以是一个一维numpy数组也可以是一个二维numpy数组如果是一维numpy(m)数组则是一个样本,包含m个特征,返回一个类别值如果是二维numpy(n*m)数组则表示n个样本,每个样本包含m个特征,返回一个numpy一维数组提示:一种可能的实现方式为if len(feature.shape) == 1: # 如果是一个样本return self.traverse_node(self.root, feature) # 从根节点开始路由return np.array([self.traverse_node(self.root, f) for f in feature]) # 如果是很多个样本'''# 定义决策树模型,传入算法参数DT = DecisionTree(classes=[0,1], features=feature_names.tolist(), max_depth=5, min_samples_split=10, impurity_t='gini')featLabels = []#myTree = createTree(dataSet, label, features, featLabels)#features = feature_names#print(len(features))#dataSet = list(x_train)#print(len(dataSet[0]))dataSet = np.hstack([np.matrix(x_train), np.matrix(y_train).T])print(dataSet.shape)dataSet = dataSet.tolist()print(len(feature_names))myTree = DT.fit(dataSet, feature_names.tolist()) # 在训练集上训练print(myTree)p_test = DT.predict(myTree, list(feature_names), x_test) # 在测试集上预测,获得预测值print(p_test) # 输出预测值#test_acc = accuracy_score(p_test, y_test) # 将测试预测值与测试集标签对比获得准确率#print('accuracy: {:.4f}'.format(test_acc)) # 输出准确率
模型效果计算
第一次模型测试结果可能不够好,可以先检查调试代码是否有bug,再尝试调整参数或者优化计算方法。
Err = p_test-y_testErrNum = 0for i in range(len(Err)):if Err[i] != 0:ErrNum = ErrNum + 1ErrRate = float(ErrNum)/len(Err)print(ErrRate)print(1-ErrRate)
