Python项目之ATM1.2多模块二版-json文件+功能完善

发布时间 2023-12-20 17:13:59作者: Lea4ning

ATM1.2多模板二版

Github仓库(目前是私人)

image-20231220170853167

image-20231220165958700

功能介绍

func_menu = 
'''
========================ATM=========================
    1.注册            2.登录             3.激活银行卡
    4.取款            5.转账             6.充值
    7.个人日志        8.个人流水          9.个人信息
    10.退出系统                                     
======================欢迎使用=======================
'''

代码展示

bin

# 启动文件
# bin/main.py
from conf import config  # 导入配置文件
from core.system_src import main_system

if __name__ == '__main__':
    main_system()

conf

# 配置文件
# conf/settings.py
# 配置文件
# 配置路径
import os
import time
import json

# 文件目录根路径
BASE_DIR = os.path.dirname(os.path.dirname(__file__))

# 数据文件的根路径
DB_DIR = os.path.join(BASE_DIR, 'database')
LOG_DIR = os.path.join(BASE_DIR, 'log')

# 细分数据文件夹
# user_pwd.json  # 用户名 + 密码
USER_DATA_DIR = os.path.join(DB_DIR, 'all_user_data')
# {username}_bank_info.json  # 各用户的银行信息 # 银行卡号 # 交易密码 # 所剩余额
BANK_DATA_DIR = os.path.join(DB_DIR, 'bank_data')
# {username}_bank_log.json   # 各用户的银行日志 # 用户姓名 # 登录日志 # 登录时间
LOG_LOG_DIR = os.path.join(LOG_DIR, 'user_log')
# {username}_bank_flow.json  # 各用户的银行流水 # 用户姓名 # 交易内容 # 余额变动 # 交易时间
LOG_FLOW_DIR = os.path.join(LOG_DIR, 'user_flow')

# 创建文件目录列表
path_list = []
path_list.append(BASE_DIR)  # 文件目录根路径
path_list.append(DB_DIR)  # 各数据文件根路径
path_list.append(LOG_DIR)  # 各日志文件根路径
path_list.append(USER_DATA_DIR)  # 用户数据存放路径
path_list.append(BANK_DATA_DIR)  # 各用户银行数据存放路径
path_list.append(LOG_LOG_DIR)  # 各用户日志文件存放路径
path_list.append(LOG_FLOW_DIR)  # 各用户流水文件存放路径

# 检查数据文件是否存在,如果不存在,将初始化一个json文件
check_file_path = os.path.join(USER_DATA_DIR, 'user_pwd.json')


def creat_folder(*args):
    try:
        for path in args:
            if not os.path.exists(path):
                os.mkdir(path)
                print(f"所需文件夹 {os.path.basename(path)} 创建成功  >>>>>>  可在{path}查看!")
        if not os.path.exists(check_file_path):
            # 检查数据文件是否存在,如果不存在,将初始化一个json文件
            with open(file=check_file_path, mode='w', encoding='utf8') as fp:
                init_json = {'Init': {'username': '', 'password': ''}}
                json.dump(init_json, fp, ensure_ascii=False)
                fp.flush()
                print(f"所需文件 {os.path.basename(check_file_path)} 创建成功  >>>>>>  可在{check_file_path}查看!")
        print("所需文件全部创建成功,马上进入系统!")
        time.sleep(1)
    except Exception as e:
        print(f"意料之外的错误!错误信息{e}")


config = creat_folder(*path_list)

core

# 核心代码
#  core/bank_src.py


# 处理银行相关内容
# 激活银行卡 # 取款 # 转账 # 充值 # 查看银行信息
# {username}_bank_info.json  # 各用户的银行信息 # 银行卡号 # 交易密码 # 所剩余额
# {username}_bank_log.json   # 各用户的银行日志 # 用户姓名 # 登录日志 # 登录时间
# {username}_bank_flow.json  # 各用户的银行流水 # 用户姓名 # 交易内容 # 余额变动 # 交易时间
import datetime
from database import read_data, save_data, creat_path
from core.user_src import login_user_dict, check_login, creat_flow, creat_log


# 验证格式化
def verify_format(tag):
    def wrapper(func):
        def inner(*args, **kwargs):
            while True:
                print("按Q退出输入".center(50, '-'))
                # 银行卡号录入
                bank_id = input("请输入银行卡号:").strip()
                if bank_id == 'Q':
                    return False, "已退出输入!"
                if not bank_id.isidentifier() and len(bank_id) != 6:
                    print("银行卡号输入格式有误!请输入6位纯数字!")
                    continue
                # 当不是激活银行卡,验证是否与当前登录用户的银行卡信息一致
                elif tag != 'active' and bank_id != login_user_dict['bank_id']:
                    print("银行卡号输入有误!")
                    continue
                count = 0
                if tag != 'recharge':
                    while count < 3:
                        pay_pwd = input("请输入交易密码:").strip()
                        if tag == 'active':
                            reconfirm = input("请重新确认交易密码:").strip()
                            if reconfirm == pay_pwd:
                                break
                            else:
                                print("输入有误,请重新输入!")
                                continue
                        if pay_pwd != login_user_dict['pay_pwd']:
                            print("交易密码输入错误,请重新输入!")
                            count += 1
                            continue
                        else:
                            break
                if tag == 'active':
                    return func(bank_id=bank_id, pay_pwd=pay_pwd)
                balance = input("请输入交易金额:").strip()
                if not balance.isdigit():
                    print("金额输入格式有误,请重新输入!")
                    continue
                balance = int(balance)
                if tag == 'withdraw' or 'transfer':
                    if balance > login_user_dict['balance']:
                        print("余额不足!")
                        continue
                break
            return func(bank_id=bank_id, balance=balance)

        return inner

    return wrapper


def check_bank(tag):
    def check_active(func):
        def inner(*args, **kwargs):
            flag, data = read_data(path=creat_path('bank', path=f"{login_user_dict['username']}_bank_info.json"))
            if data[login_user_dict['username']]['bank_id'] != '' and tag == 'active':
                return False, '请不要重复激活!'
            elif data[login_user_dict['username']]['bank_id'] == '' and tag != 'active':
                return False, '尚未激活银行卡,请先激活银行卡!'
            else:
                return func(*args, **kwargs)

        return inner

    return check_active


# 激活银行卡
@check_login
@check_bank('active')
@verify_format('active')
def activate_bank_id(**kwargs):
    username = login_user_dict['username']
    flag, data = read_data(path=creat_path('bank', path=f"{username}_bank_info.json"))
    bank_id = kwargs.get('bank_id')
    pay_pwd = kwargs.get('pay_pwd')
    # 创建用户字典
    data[username] = {'bank_id': bank_id, 'pay_pwd': pay_pwd, 'balance': 1000}
    # 创建数据
    save_data(data, path=creat_path('bank', f"{username}_bank_info.json"))
    # 写入流水
    txt = f"{username}|激活银行卡|余额:0 - 1000 |时间为{datetime.datetime.now().strftime('%x %X')}"
    creat_flow(txt=txt, username=username)
    creat_log(txt=f"{username}|激活银行卡成功|时间为{datetime.datetime.now().strftime('%x %X')}", username=username)
    return True, "激活银行卡成功!"


# 取款
@check_login
@check_bank('bank')
@verify_format('withdraw')
def withdraw_money(**kwargs):
    login_flag, login_info = read_data(path=creat_path('bank', path=f"{login_user_dict['username']}_bank_info.json"))
    balance = kwargs.get('balance')
    login_info[login_user_dict['username']]['balance'] = login_user_dict['balance'] - balance
    msg = save_data(login_info, path=creat_path('bank', path=f"{login_user_dict['username']}_bank_info.json"))
    print(msg)
    txt_login = f"{login_user_dict['username']}|取款{balance}元|余额:{login_user_dict['balance']}-{login_user_dict['balance'] - balance}|时间为{datetime.datetime.now().strftime('%x %X')}"
    creat_flow(txt=txt_login, username=login_user_dict['username'])
    return True, ''


# 转账
@check_login
@check_bank('bank')
@verify_format('transfer')
def transfer_money(**kwargs):
    # 拿到格式化后的金额
    balance = kwargs.get('balance')
    reconfirm_balance = input(f"请重新确认转账金额{balance}(y/n):").strip()
    if reconfirm_balance.upper() != 'Y':
        return False, ''
    payee = input("请输入收款方用户名:").strip()  # 输入转账用户名
    flag, data_pwd = read_data(path=creat_path('pwd', path=f"user_pwd.json"))
    if payee not in data_pwd.keys():
        return False, '对方尚未注册,请提示对方先注册!'
    flag_info, data_info = read_data(path=creat_path('bank', path=f"{payee}_bank_info.json"))
    login_flag, login_info = read_data(path=creat_path('bank', path=f"{login_user_dict['username']}_bank_info.json"))
    # 2次机会输入
    for i in range(2):
        payee_bank_id = input("请输入收款方银行卡号:").strip()
        if payee_bank_id != data_info[payee]['bank_id']:
            print("收款方银行卡号输入有误,请重新输入!")
            continue
        else:
            break
    else:
        return False, "转账失败!"
    # 数据验证过后,进行数据的更新  # 交易金额balance  # 收款对象payee
    # 数据字典 : 当前登录用户 余额减去交易金额 当前登录用户的其他信息在login_user_dict中
    # 更新支付方余额信息
    login_info[login_user_dict['username']]['balance'] = login_user_dict['balance'] - balance
    # 更新收款方的余额信息
    data_info[payee]['balance'] = data_info[payee]['balance'] + balance
    msg = save_data(login_info, path=creat_path('bank', path=f"{login_user_dict['username']}_bank_info.json"))
    print(msg)
    msg = save_data(data_info, path=creat_path('bank', path=f"{payee}_bank_info.json"))
    print(msg)
    # 增加流水
    txt_login = f"{login_user_dict['username']}|转账给{payee} {balance}元|余额:{login_user_dict['balance']}-{login_user_dict['balance'] - balance}|时间为{datetime.datetime.now().strftime('%x %X')}"
    creat_flow(txt=txt_login, username=login_user_dict['username'])
    txt_payee = f"{payee}|收到{login_user_dict['username']}转账{balance}元|余额:{data_info[payee]['balance']}-{data_info[payee]['balance'] + balance}|时间为{datetime.datetime.now().strftime('%x %X')}"
    creat_flow(txt=txt_payee, username=payee)
    return True, ''


# 充值
@check_login
@check_bank('bank')
@verify_format('recharge')
def recharge(**kwargs):
    login_flag, login_info = read_data(path=creat_path('bank', path=f"{login_user_dict['username']}_bank_info.json"))
    balance = kwargs.get('balance')
    login_info[login_user_dict['username']]['balance'] = login_user_dict['balance'] + balance
    txt_login = f"{login_user_dict['username']}|充值{balance}元|余额:{login_user_dict['balance']}-{login_user_dict['balance'] + balance}|时间为{datetime.datetime.now().strftime('%x %X')}"
    creat_flow(txt=txt_login, username=login_user_dict['username'])
    return True, txt_login


@check_login
# 银行信息
def bank_info():
    login_flag, login_info = read_data(path=creat_path('bank', path=f"{login_user_dict['username']}_bank_info.json"))
    print(f"""
    当前登录用户{login_user_dict['username']}的信息如下:
        银行卡号:{login_info[login_user_dict['username']]['bank_id']}
        当前余额:{login_info[login_user_dict['username']]['balance']}
    """)
    return True, ''
# core/user_src.py


from lib import encrypt_data, random_code
from database import read_data, save_data, creat_path, creat_data
from conf import LOG_LOG_DIR, LOG_FLOW_DIR
from functools import wraps
import datetime
import os

login_user_dict = {
    'username': '',
    'bank_id': '',
    'pay_pwd': '',
    'balance': 0
}


# 格式化登录字典
def init_login_dict(username):
    path_bank = creat_path(tag='bank', path=f'{username}_bank_info.json')  # 账号用户信息存放至{username}_bank_info.json中
    flag, data = read_data(path_bank)  # 读取原有的数据
    login_user_dict['username'] = username
    login_user_dict['bank_id'] = data[username]['bank_id']
    login_user_dict['pay_pwd'] = data[username]['pay_pwd']
    login_user_dict['balance'] = data[username]['balance']


# 拿到用户名和密码
def get_username_pwd():
    username = input("请输入用户名:").strip()
    if not username.isidentifier():
        return False, '用户名不可以输入非法字符!'
    password = input("请输入登录密码:").strip()
    if len(password) < 4:
        return False, '密码不可以少于4位!'
    return username, password


# 登录
def login():
    # 用户输入用户名和密码
    for i in range(2):
        username, password = get_username_pwd()
        # 拿到总的一个数据
        path = creat_path(tag='pwd', path='user_pwd.json')
        flag, data = read_data(path=path)
        if not flag:
            return False, data  # 文件不存在就返回错误信息
        # 拿到登录用户的信息
        if username not in data.keys():
            return False, "用户不存在"
        user_dict = data.get(username)
        # 拿到32位加密串
        true_password = user_dict.get('password')[:-4]
        # 将用户输入的密码进行同样的加密
        is_password = encrypt_data(password)
        if true_password == is_password:
            init_login_dict(username=username)
            txt = f"{username}|登录成功|时间为{datetime.datetime.now().strftime('%x %X')}"
            creat_log(txt=txt, username=username)
            return True, txt
        else:
            print("密码错误!")
            continue
    return False, "登录失败!"


# 注册
def register():
    # 读取数据
    path = creat_path(tag='pwd', path='user_pwd.json')
    flag, data = read_data(path=path)
    username, password = get_username_pwd()
    if not username:
        return False, password
    if username in data.keys():
        return False, '用户名已存在!'
    # 使用验证码进行人机校验
    code = random_code(4)
    # 验证码校验
    code_input = input(f"请输入验证码:{code}\n验证码输入:>>>").strip()
    if code.upper() != code_input.upper():
        return "验证码输入错误!"
    # 加密密码
    password = encrypt_data(password)
    # 加入盐,提高密码安全度
    salt = random_code(4)
    password += salt
    # 将注册的用户密码信息加入至json文件
    data[username] = {'username': username, 'password': password}
    # 对新用户数据进行初始化
    path_bank = creat_path(tag='bank', path=f'{username}_bank_info.json')  # 银行信息存放至{username}_bank_info.json中
    user_dict = creat_data(tag='new', username=username)
    save_data(user_dict, mode='w', path=path_bank)  # 初始化新用户数据
    # json文件不可以出现多个字典及多行内容,将用户名作为键名,更新进原字典,并将原字典覆写
    msg = save_data(data=data, mode='w', path=path)
    if msg == 'True':
        txt = f"{username}|注册成功|时间为{datetime.datetime.now().strftime('%x %X')}"
        creat_log(txt=txt, username=username)
        return True, txt
    else:
        return False, msg


# 处理日志和流水
# 创建日志数据
def creat_log(txt, username):
    path = os.path.join(LOG_LOG_DIR, f'{username}_log.txt')
    with open(path, 'a', encoding='utf8') as fp:
        fp.write(txt + '\n')
        fp.flush()


# 创建银行流水数据格式
def creat_flow(txt, username):
    path = os.path.join(LOG_FLOW_DIR, f'{username}_flow.txt')
    with open(path, 'a', encoding='utf8') as fp:
        fp.write(txt + '\n')
        fp.flush()


def check_login(func):
    @wraps(func)
    def inner(*args, **kwargs):
        if login_user_dict['username'] != '':
            flag, msg = func(*args, **kwargs)
            return flag, msg
        else:
            return False, "请先登录!"

    return inner


@check_login
def check_log():  # 查看日志
    path = os.path.join(LOG_LOG_DIR, f'{login_user_dict["username"]}_log.txt')
    with open(path, 'r', encoding='utf8') as fp:
        read_log = fp.read()
    output = (f"当前登录用户{login_user_dict['username']}的日志如下:\n"
              f"{read_log}")
    return True, output


@check_login
def check_flow():  # 查看流水
    path = os.path.join(LOG_FLOW_DIR, f'{login_user_dict["username"]}_flow.txt')
    with open(path, 'r', encoding='utf8') as fp:
        read_flow = fp.read()
    output = (f"当前登录用户{login_user_dict['username']}的流水如下:\n"
              f"{read_flow}")
    return True, output

# core/system.py
# 系统界面


from core.user_src import register, login  # 用户登录注册
from core.user_src import check_log, check_flow  # 用户日志流水
from core.bank_src import activate_bank_id, withdraw_money, transfer_money, recharge, bank_info  # 用户银行业务

import time


def quit_system():  # 退出系统
    return True, 'break'


func_menu = '''
========================ATM=========================
    1.注册            2.登录             3.激活银行卡
    4.取款            5.转账             6.充值
    7.个人日志        8.个人流水          9.个人信息
    10.退出系统                                     
======================欢迎使用=======================
'''
func_dict = {
    1: register,
    2: login,
    3: activate_bank_id,
    4: withdraw_money,
    5: transfer_money,
    6: recharge,
    7: check_log,
    8: check_flow,
    9: bank_info,
    10: quit_system
}


def main_system():
    # 记录开始时间
    start_time = time.time()
    while True:
        # 打印功能菜单
        print(func_menu)
        func_choice = input("请输入功能ID:>>>>").strip()
        if not func_choice.isdigit():
            print(f"{func_choice}不是功能ID,请输入数字!")
            continue
        func_choice = int(func_choice)
        if func_choice not in func_dict.keys():
            print(f"{func_choice}功能不存在,请重新选择!")
            continue
        func = func_dict[func_choice]
        flag, msg = func()
        if flag:
            if msg == 'break':
                print("系统已退出,欢迎下次使用!")
                end_time = time.time()
                use_time = end_time - start_time
                print(f"本次系统共使用了{use_time}秒~")
                break
            print(msg)
        else:
            print(msg)


if __name__ == '__main__':
    main_system()

database

# 数据库
# database/db_hander.py


# 处理数据相关内容
# 读数据 写数据
# 数据处理,读取,保存
import json
import os.path
from conf import USER_DATA_DIR
from conf import BANK_DATA_DIR


def creat_path(tag, path):
    if tag == 'pwd':
        path = os.path.join(USER_DATA_DIR, path)
    elif tag == 'bank':
        path = os.path.join(BANK_DATA_DIR, path)
    return path


def read_data(path):
    try:
        with open(path, 'r', encoding='utf8') as fp:
            data_dict = json.load(fp)
            return True, data_dict
    except FileNotFoundError:
        return False, f"数据读取有误,请检查文件完整性!错误路径:{path}"


def creat_data(tag, **kwargs):
    user_dict = {}
    if tag == 'new':  # 新注册
        user_dict = {kwargs.get('username'): {'bank_id': '', 'pay_pwd': '', 'balance': 0}}
    # elif tag == 'old':  # 老用户
    #     user_dict = {kwargs.get('username')}
    return user_dict


def save_data(data, mode='w', path=None):
    try:
        with open(path, mode, encoding='utf8') as fp:
            # 为防止出现中文显示问题,设置ensure_ascii参数
            json.dump(data, fp, ensure_ascii=False)
            fp.flush()
            return '数据更新成功!'
    except Exception as e:
        return f"意料之外的错误:{e}"

lib

# lib/common.py
# 公用方法
# 获取验证码 # 加密数据
import hashlib
import random


# 获取随机验证码
def random_code(x):
    code = ''
    for i in range(x):
        random_list = [str(random.randint(0, 9)), chr(random.randint(97, 122)), chr(random.randint(65, 90)),
                       random.randint(0, 9)]
        code1 = str(random.choice(random_list))
        code += code1
    return code


# 加密数据函数
def encrypt_data(data):
    md5 = hashlib.md5()
    # 创建一个hash对象
    data = data.encode('utf8')
    # 对数据进行二进制处理
    md5.update(data)
    # 将数据进行加密
    encrypted_data = md5.hexdigest()
    # 16进制加密 # 传回32位的加密串
    return encrypted_data