基于朴素贝叶斯的中文垃圾电子邮件分类

2020年5月6日 · 1684 字 · 4 分钟 · ML

基于朴素贝叶斯的中文垃圾电子邮件分类

训练数据和测试数据

本次主要使用了github上的开源数据

数据处理

首先使用正则表达式对训练集中的中文邮件的内容进行过滤,去除其中中文之外的其他字符,再将剩下的内容使用jieba进行分词,过滤掉中文停用字表中的内容。将垃圾邮件以及正常邮件的处理结果分别存放。

用两个字典spam_voca和normal_voca用于保存不同邮件中不同词语的词频,数据处理完成。

训练与预测

训练和预测的过程主要使计算$ P(Spam|word_1,word_2,…,word_n) $的概率,当这个概率大于某个阈值的时候,这个邮件被分类为垃圾邮件。

根据朴素贝叶斯的条件独立假设,并设先验概率$P(s)=P(s^{’})=0.5$,则有: P(s|w_1,w_2,…,w_n)=$\frac {P(s,w_1,w_2,…,w_n)}{P(w_1,w_2,…,w_n)}$

=$\frac {P(w_1,w_2,…,w_n|s)P(s)}{P(w_1,w_2,…,w_n)}$=$\frac {P(w_1,w_2,…,w_n|s)P(s)}{P(w_1,w_2,…,w_n|s)\cdot p(s)+P(w_1,w_2,…,w_n|s^{’})\cdot p(s^{’})}\qquad\qquad$

因为P(spam)=P(not spam),则有

$\frac {\prod\limits_{j=1}^nP(w_j|s)}{\prod\limits_{j=1}^nP(w_j|s)+\prod\limits_{j=1}^nP(w_j|s^{’})}$ 再利用贝叶斯$P(w_j|s)=\frac{P(s|w_j)\cdot P(w_j)}{P(s)}$,式子化为 $\frac {\prod\limits_{j=1}^nP(s|w_j)}{\prod\limits_{j=1}^nP(s|w_j)+\prod\limits_{j=1}^nP(s^{’}|w_j)}$

具体流程

  • 对测试集中的每一封邮件做同样的处理,并计算得到$P(s|w)$最高的n个词,在计算过程中,若该词只出现在垃圾邮件的词典中,则令$P(w|s^{’})=0.01$,反之亦然;若都未出现,则令$P(s|w)=0.4$。PS.这里做的几个假设基于前人做的一些研究工作得出的。
  • 对得到的每封邮件中重要的15个词利用式2计算概率,若概率$>$阈值$\alpha(一般设为0.9)$,则判为垃圾邮件,否则判为正常邮件。

具体的代码细节可以参见附加的代码。

结果

通过调整预测是使用的词汇的数量,针对本次数据集,最佳的结果是

选择29个词汇 : 0.9642857142857143

附加

  • 项目结构 ├── data │ ├── 中文停用词表.txt │ ├── normal │ ├── spam │ └── test ├── main.py ├── normal_voca.json ├── pycache │ └── utils.cpython-36.pyc ├── spam_voca.json └── utils.py

  • 具体代码

    # utils.py
    import jieba
    import numpy
    import re
    import os
    import json
    from collections import defaultdict
    
    spam_file_num=7775 
    normal_file_num=7063
    
    #todo 原始邮件数据的预处理
    '''
    获取中文的停用词表
    【停用词】(https://zh.wikipedia.org/wiki/%E5%81%9C%E7%94%A8%E8%AF%8D)
    '''
    
    def get_stopwords():
        return [
            i.strip() for i in open('./data/中文停用词表.txt', encoding='gbk')
        ]
    
    
    '''
        读取原始文件,进行简单的处理和分词,返回分词列表
    
    '''
    
    
    def get_raw_str_list(path):
        stop_list = get_stopwords()
        with open(path, encoding='gbk') as f:
            raw_str = f.read()
        pattern = '[^\u4E00-\u9FA5]'  #中文的unicode编码的范围
        regex = re.compile(pattern)
        ret = regex.findall(raw_str)
        handled_str = re.sub(pattern, '', raw_str)
        str_list = [
            word for word in jieba.cut(handled_str) if word not in stop_list
        ]
        return str_list
    
    #分词以及统计词频,得到词汇表
    '''
    返回一个字典,记录了分词后的词汇表
    path:k可以使dir或者file路径,默认为存放文本的dir
    is_file_path:表示输入路径是不是文件
    
    '''
    
    
    def get_voca(path, is_file_path=False):
        if is_file_path:
            return read_voca_from_file(path)
    
        voca = defaultdict(int)
        # 获取垃圾邮件列表
        file_list = [file for file in os.listdir(path)]
        #获得词汇表
        for file in file_list:  #测试用
            raw_str_list = get_raw_str_list(path + str(file))
            for raw_str in raw_str_list:
                voca[raw_str] = voca[raw_str] + 1
    
        return voca
    
    
    
    '''
        将得到的词汇表保存到json文件中,以便下次读取
        voca:词汇表字典
        path:文件的保存路径
        sort_by_value:是否按value值排序
    
    '''
    
    
    def save_voca2json(voca, path, sort_by_value=False,indent_=4):
        if sort_by_value:
            sorted_by_value(voca)
    
        with open(path, 'w+') as f:
            f.write(json.dumps(voca, ensure_ascii=False, indent=indent_))
    
    
    '''
    从json文件中读取voca
    '''
    
    def read_voca_from_file(path):
        voca = None
        with open(path) as f:
            voca = json.load(f)
        return voca
    
    
    '''
    将字典基于value排序
    '''
    
    def sorted_by_value(_dict):
        _dict = dict(sorted(spam_voca.items(), key=lambda x: x[1], reverse=True))
    
    
    #计算 P(Spam|word)
    
    '''
        计算邮件和邮件分类最相关词语及其 P(spam|word)
        words_size:最终使用词语的数量,用于最后的预测
    
    '''
    
    def get_top_words_prob(path,spam_voca,normal_voca,words_size=30):
        critical_words=[]
        for word in get_raw_str_list(path):
            if word in spam_voca.keys() and word in normal_voca.keys():
                # 如果word在两边都出现
                p_w_s=spam_voca[word]/spam_file_num
                p_w_n=normal_voca[word]/normal_file_num
                p_s_w=p_w_s/(p_w_n+p_w_s)
    
            elif word in spam_voca.keys() and word not in normal_voca.keys():
                # 如果word只在spam出现
                p_w_s=spam_voca[word]/spam_file_num
                p_w_n=0.01
                p_s_w=p_w_s/(p_w_n+p_w_s)
    
            elif word not in spam_voca.keys() and word in normal_voca.keys():
                # 如果word只在normal出现
                p_w_s=0.01
                p_w_n=normal_voca[word]/normal_file_num
                p_s_w=p_w_s/(p_w_n+p_w_s)
            else:
                #如果都不出现
                p_s_w=0.4
            #以上设定的数值均是基于前人的研究得到的数据
            critical_words.append([word,p_s_w])
    
        return dict(sorted(critical_words[:words_size],key=lambda x:x[1],reverse=True))
    
    '''
        计算贝叶斯概率
        words_prob:包含词汇以及其P(spam|word)概率的一个字典
        spam_voca:垃圾邮件词汇表
        normal_voca:正常邮件词汇表
    '''
    
    
    def caculate_bayes(words_prob,spam_voca,normal_voca):
        p_s_w=1
        p_s_nw=1
        for word,prob in words_prob.items():
            p_s_w*=prob
            p_s_nw*=(1-prob)
    
        return p_s_w/(p_s_w+p_s_nw)
    
    def predict(bayes,threshold=0.9):
        return bayes>=threshold    
    
    
    '''
        返回文件名和label
    '''
    
    def get_files_labels(dir_path,is_spam=True):
        raw_files_list=os.listdir(dir_path)
        files_list= [dir_path+file for file in raw_files_list]
        labels=[is_spam for i in range(len(files_list))]
        return files_list,labels
    
    
    '''
        预测测试集结果并打印
        file_list:测试集文件路径集合
        y:测试集结果标签
        word_size:预测的指标
    '''
    
    def predict_result(file_list,y,spam_voca,normal_voca,word_size=30):
        ret=[]
        right=0
        for file in file_list:
    #         raw_strs=get_raw_str_list(str(file))
            words_prob=get_top_words_prob(file,spam_voca,normal_voca,words_size=word_size)
            bayes=caculate_bayes(words_prob,spam_voca,normal_voca)
            ret.append(predict(bayes))
        for i in range(len(ret)):
            if ret[i]==y[i]:
                right+=1
    
        print(right/len(y))
    
    #main.py
    from utils import *
    
    if __name__=='__main__':
        #获取词典并保存,一遍下次读取
        spam_voca=get_voca('./spam_voca.json',is_file_path=True)
        normal_voca=get_voca('./normal_voca.json',is_file_path=True)
        #保存方便下次读取
        save_voca2json(spam_voca,'./spam_voca.json')
        save_voca2json(normal_voca,'./snormal_voca.json')
        #预处理测试数据
        s_x,s_y=get_files_labels('./data/test/spam/')
        n_x,n_y=get_files_labels('./data/test/normal/',is_spam=False)
        x,y=list(s_x+n_x),s_y+n_y
        #预测结果
    
        for i in range(10,40):
            print(str(i)+' : ',end='')
            predict_result(x,y,spam_voca,normal_voca,word_size=i)
    

源码

github