tensorflow模型训练保存minist OCR

发布时间 2023-08-22 09:24:01作者: 哈库拉
import tensorflow as tf
from tensorflow.python.framework.convert_to_constants import convert_variables_to_constants_v2
from tensorflow.python.tools import optimize_for_inference_lib

import numpy as np
import matplotlib.pyplot as plt
import datetime
import cv2
import os
import imgaug as ia
import imgaug.augmenters as iaa
seq = iaa.Sequential([
    # iaa.Fliplr(0.5), # horizontal flips
    # iaa.Crop(percent=(0, 0.1)), # random crops
    # Small gaussian blur with random sigma between 0 and 0.5.
    # But we only blur about 50% of all images.
    iaa.Sometimes(
        0.5,
        iaa.GaussianBlur(sigma=(0, 0.5))
    ),
    # Strengthen or weaken the contrast in each image.
    iaa.LinearContrast((0.75, 1.5)),
    # Add gaussian noise.
    # For 50% of all images, we sample the noise once per pixel.
    # For the other 50% of all images, we sample the noise per pixel AND
    # channel. This can change the color (not only brightness) of the
    # pixels.
    iaa.AdditiveGaussianNoise(loc=0, scale=(0.0, 0.05*255), per_channel=0.5),
    # Make some images brighter and some darker.
    # In 20% of all cases, we sample the multiplier once per channel,
    # which can end up changing the color of the images.
    iaa.Multiply((0.8, 1.2), per_channel=0.2),
    # Apply affine transformations to each image.
    # Scale/zoom them, translate/move them, rotate them and shear them.
    # iaa.Affine(
    #     scale={"x": (0.8, 1.2), "y": (0.8, 1.2)},
    #     translate_percent={"x": (-0.2, 0.2), "y": (-0.2, 0.2)},
    #     rotate=(-25, 25),
    #     shear=(-8, 8)
    # )
], random_order=True) # apply augmenters in random order

mnist=tf.keras.datasets.mnist

tiandict = {}
def convertLabel2Num():
    label = "d:/ocr/tkeysv1.txt"
    with open(label ,'r') as f:
        labels = f.readlines() 
    labels = [x.strip() for x in labels]
    
    for (i,x) in enumerate(labels):
        tiandict[x] = i


def tain_dataLoad(mode="train"):
    # testimg= "d:/ocr/test"
    trainimg= "d:/ocr/{}".format(mode)
    trainlist = "d:/ocr/data_{}.txt".format(mode)
    with open(trainlist, 'r') as f:
        imgs = f.readlines()

    imgs = [x.strip() for x in imgs]
    labels = [x.strip().split("_")[1].split(".")[0] for x in imgs ]
    Ncount_0 = len([x for x in labels if x == "0"])
    Ncount_B = len([x for x in labels if x == "B"])
    if (mode == "train"):
        Ntrain = len(imgs)+Ncount_0+Ncount_B
    else:
        Ntrain = len(imgs)
    x_train= np.zeros((Ntrain,28,28), dtype = np.uint8)
    y_train = np.zeros((Ntrain,),dtype=np.uint8)
    xi = 0
    for (i,x) in enumerate(imgs):
        fullpath = os.path.join(trainimg, x)
        image = cv2.imread(fullpath,0)
        # image_aug = seq(image)
        x_train[i,:,:] = image
        y_train[i] = tiandict[labels[i] ]
        if (mode == "train"):
            if(labels[i] == "0" or labels[i] =="B"):
                ret,thre = cv2.threshold(image,100,255,cv2.THRESH_BINARY)
                x_train[len(imgs)+xi] = thre 
                y_train[len(imgs)+xi] = tiandict[labels[i] ]
                xi+=1


    if(mode == "train"):
        images_aug = seq(images=x_train)
        x_train = images_aug
    # a = images_aug[0,:,:]
    return x_train,y_train

convertLabel2Num()

x_train,y_train,x_test,y_test = None,None,None,None
x_validate,y_validate=None,None

def loadAllData():
    global  x_train,y_train,x_test,y_test,x_validate,y_validate
    x_testTF, y_testTF = tain_dataLoad("test")
    x_trainTF, y_trainTF = tain_dataLoad("train")
    x_train,y_train,x_test,y_test=x_trainTF, y_trainTF,x_testTF,y_testTF
    #获取数据,训练集,测试集 60k训练,10K测试
    #网络下载
    # (x_train,y_train),(x_test,y_test)=mnist.load_data()
    #首先是数据 INPUT 层,输入图像的尺寸统一归一化为32*32。
    x_train= np.pad(x_train,((0,0),(2,2),(2,2)),'constant',constant_values=0) #28*28-》32*32
    x_test= np.pad(x_test,((0,0),(2,2),(2,2)),'constant',constant_values=0) #28*28-》32*32
    print(x_train.shape,x_test.shape, y_train.shape)

    #数据集格式转换
    # x_train=x_train.astype('float32')
    # x_train=x_train.astype('float32')

    #归一化,就是为了限定你的输入向量的最大值跟最小值不超过你的隐层跟输出层函数的限定范围。
    x_train=x_train/255#归一化
    x_test=x_test/255#归一化

    x_trainAll=x_train.reshape(x_train.shape[0],32,32,1)
    y_trainAll=y_train.reshape(y_train.shape[0],1)
    Ntrain = int(x_train.shape[0]*0.9)
    x_train = x_trainAll[:Ntrain]
    y_train = y_trainAll[:Ntrain]
    x_validate = x_trainAll[Ntrain+1:]
    y_validate = y_trainAll[Ntrain+1:,0]
    x_test=x_test.reshape(x_test.shape[0],32,32,1)
    print(x_train.shape,x_test.shape, x_validate.shape, y_validate.shape)

def LeNetModel():
#模型实例化,根据LeNet 的结构
    model=tf.keras.models.Sequential([
        tf.keras.layers.Conv2D(filters=6,kernel_size=(5,5),padding='valid',activation=tf.nn.relu,input_shape=(32,32,1)),
        tf.keras.layers.AveragePooling2D(pool_size=(2,2),strides=(2,2),padding='same'),
        tf.keras.layers.Conv2D(filters=16,kernel_size=(5,5),padding='valid',activation=tf.nn.relu,input_shape=(32,32,1)),
        tf.keras.layers.AveragePooling2D(pool_size=(2,2),strides=(2,2),padding='same'),
        tf.keras.layers.Flatten(),
        tf.keras.layers.Dense(units=120,activation=tf.nn.relu),
        tf.keras.layers.Dense(units=84,activation=tf.nn.relu),
        tf.keras.layers.Dense(units=16,activation=tf.nn.softmax),
        ])
    return model

def train():
    num_epochs=30#训练次数
    batch_size=800#每个批次喂多少张图片
    lr=0.001#学习率
    model = LeNetModel()
    model.summary() #输出摘要
    #优化器
    adam_optimizer=tf.keras.optimizers.Adam(lr)
    
    model.compile(
            optimizer=adam_optimizer,
            loss=tf.keras.losses.sparse_categorical_crossentropy,
            metrics=['accuracy']
        )
    
    start_time=datetime.datetime.now() #开始训练时间
    
    model.fit(x=x_train,y=y_train,batch_size=batch_size,epochs=num_epochs)
    end_time=datetime.datetime.now() #训练结束时间
    time_cost=end_time-start_time #训练总时间
    print('time_cost: ',time_cost)
    model.save('leNet_model.h5') #保存模型
    print("save to pb file")
    model.save("tian/", save_format="tf")
    print("----------------------")
    print(x_test.shape, y_test.shape, x_validate.shape, y_validate.shape)
    print(model.evaluate(x_test,y_test))
    print(model.evaluate(x_validate,y_validate))
    print("Finished!")


def pred_oneImg():
    model=tf.keras.models.load_model('leNet_model.h5')
    image = cv2.imread("d:/ocr/minist/9.jpg",0)
    pred=model.predict(image.reshape(1,32,32,1))
    print("pred: ", pred, pred.argmax())

def pred_function():
    model=tf.keras.models.load_model('leNet_model.h5')
     
    i=0
    Ncorrect = 0
    for xx in x_validate:
        # 预测
        print(xx.shape)
        image = xx
        label = y_validate[i]
        pred=model.predict(image.reshape(1,32,32,1))
        #print(type(label[0]), type(pred.argmax()))
        if(label==pred.argmax()):
            Ncorrect+=1
        print("label:",label,"predict result:",pred.argmax(),"accu:", Ncorrect/(i+1))

        #cv2.imwrite("{}.jpg".format(label), image[:,:,0]*255)
        i+=1
         
        # 显示
        if(0):
            plt.imshow(image.reshape(32,32))
            plt.savefig("predict_num.jpg")
            plt.show()

def test_x():
    model = tf.keras.models.load_model('tian/')
    #model=tf.keras.models.load_model('leNet_model.h5')
    print("load model success") 
    print(model.evaluate(x_test,y_test))
    print(model.evaluate(x_validate,y_validate))

def exportFrozenGraph(): 
    # Your model Keras的model,加载模型之后的
    model = tf.keras.models.load_model('tian/')
    frozen_out_path = './tian_V1'  # 存储模型的路径
    # name of the .pb file
    frozen_graph_filename = "frozen_graph"  # 模型名称
    
    # Convert Keras model to ConcreteFunction
    full_model = tf.function(lambda x: model(x))
    full_model = full_model.get_concrete_function(
        tf.TensorSpec(model.inputs[0].shape, model.inputs[0].dtype))
    
    # Get frozen ConcreteFunction
    frozen_func = convert_variables_to_constants_v2(full_model)
    graph_def = frozen_func.graph.as_graph_def()

    # Remove NoOp nodes
    for i in reversed(range(len(graph_def.node))):
        if graph_def.node[i].op == 'NoOp':
            del graph_def.node[i]

    for node in graph_def.node:
        for i in reversed(range(len(node.input))):
            if node.input[i][0] == '^':
                del node.input[i]

    # Remove a lot of Identity nodes
    graph_def = optimize_for_inference_lib.optimize_for_inference(graph_def,
                                                                ['x'],
                                                                ['Identity'],
                                                                tf.float32.as_datatype_enum)

    # Export frozen graph
    with tf.io.gfile.GFile('frozen_graph_optm.pb', 'wb') as f:
        f.write(graph_def.SerializeToString())
    
    return

    layers = [op.name for op in frozen_func.graph.get_operations()]
    print("-" * 60)
    print("Frozen model layers: ")
    for layer in layers:
        print(layer)
    print("-" * 60)
    print("Frozen model inputs: ")
    print(frozen_func.inputs)    # 模型输入
    print("Frozen model outputs: ")
    print(frozen_func.outputs)  # 模型输出
    
    # 存储PB模型
    # Save frozen graph to disk
    tf.io.write_graph(graph_or_graph_def=frozen_func.graph,
                      logdir=frozen_out_path,
                      name=f"{frozen_graph_filename}.pb",
                      as_text=False)
    # Save its text representation
    tf.io.write_graph(graph_or_graph_def=frozen_func.graph,
                      logdir=frozen_out_path,
                      name=f"{frozen_graph_filename}.pbtxt",
                      as_text=True)def inferOneByCVLoadPb():
    net = cv2.dnn.readNet("frozen_graph_optm.pb")
    image = cv2.imread("d:/ocr/minist/9.jpg",0)
    # pred=model.predict(image.reshape(1,32,32,1))
    net.setInput(image.reshape(1,1,32,32))
    out = net.forward()
    print(out.shape, out)
    
#loadAllData()
#train()
#pred_function()
#pred_oneImg()
#test_x()
# exportFrozenGraph()
# delete_ops_from_graph()
inferOneByCVLoadPb()

 上述代码包含功能:

1. ocr 模型训练,保存,前向推理

2. 模型固化,多余层删除