ATM(json)

发布时间 2023-12-19 15:03:57作者: 蓝幻ﹺ

【一】架构说明

- ATM # 项目名
	|- README.md # 项目说明书
    |- conf # config 配置 放你项目的配置文件
    	|- settings.py # 配置文件的内容
    |-	lib # 公共方法文件
    	|- common.py
    |-	core # 业务逻辑
    	|-	bank_src.py # 核心代码
        |-	user_src.py # 核心代码
    |-	db # database 数据库目录
    	|- db_hander.py # 数据处理方法
    |-	log # 日志文件
    	|-	flow.log # 日志文件
        |-	user.log # 日志文件
    |- bin # 启动文件目录
    	|- main # 启动程序

【二】详细说明

【1】conf

  • 存储配置文件的目录 settings.py
import os
import sys

BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))

def create_flie(path,name):
    create_path = os.path.join(path, name)
    os.makedirs(create_path, exist_ok=True)

create_flie(BASE_DIR,"log")
create_flie(BASE_DIR,"bin")
create_flie(BASE_DIR,"db")
create_flie(BASE_DIR,"lib")
create_flie(BASE_DIR,"core")
create_flie(BASE_DIR,"interface")


path_db = os.path.join(BASE_DIR, 'db')
create_flie(path_db,'user_data')
path_db_user = os.path.join(path_db, 'user_data')
# F:\python项目\untitled\ATM\db\user_data
create_flie(path_db,'bank_data')
path_db_bank = os.path.join(path_db, 'bank_data')
# F:\python项目\untitled\ATM\db\bank_data
path_log = os.path.join(BASE_DIR, 'log')
create_flie(path_log,'user_log')
path_log_user = os.path.join(path_log, 'user_log')
# F:\python项目\untitled\ATM\log\user_log
create_flie(path_log,'flow_log')
path_log_flow = os.path.join(path_log, 'flow_log')
# F:\python项目\untitled\ATM\log\flow_log

【2】lib

  • 公共方法文件 common.py
from datetime import datetime
import hashlib,random

def Codes():
    code=''
    for i in range(4):
        a=random.randint(0,9)
        b=chr(random.randint(97,122))
        c=chr(random.randint(65,90))
        res = random.choice([a,b,c])
        code=code+str(res)
    return code


def encry_password(data):
    md5 = hashlib.md5(data.encode('utf-8'))
    return md5.hexdigest()



def create_data_format(tag=None, **kwargs):
    try:
        if tag=='json':
            data={}
            data[kwargs.get('username')]=kwargs
            return data
        data = '|'.join(kwargs.values())
        if tag == 'log' or tag == 'flow':
            data = f'当前用户 {kwargs.get("username")} 于 {datetime.now().strftime("%Y年%m月%d日 %H时%M分%S秒")} :>>>>{kwargs.get("msg")}'
        return data
    except Exception as error:
        return error

【3】core

(1)user_src

from ATM.db import db_hander
from ATM.lib import common


login_user_dict = {
    'username': None,
    "is_admin": False,
    "bank_id": None,
    "bank_pwd": None,
    "bank_balance": None,
    'tag' : False
}


def get_user_pwd():
    # 【1】要获取用户名和密码
    username = input("用户名 :>>>> ").strip()
    password = input("密码 :>>>> ").strip()
    return username, password


def check(func):
    def inner(*args, **kwargs):
        username, password = get_user_pwd()
        # 校验密码格式 -- 必须是数字且是 3 位
        if not password.isdigit():
            return False, f'当前密码非法!'
        if len(password) != 3:
            return False, f'当前密码必须是3位'
        # 给身份,是管理员还是普通用户
        if username == 'qcc' and password == '123':
            role = 'admin'
        else:
            role = 'normal'
        return True, func(username=username, password=password, role=role,*args, **kwargs)

    return inner


@check
def register(**kwargs):
    username = kwargs.get('username')
    msg,user_list=db_hander.read_data(tag='json',path='user_pwd.json')
    if username in user_list:
        return False,'用户已注册'
    password = kwargs.get('password')
    role = kwargs.get('role')

    salt = common.Codes()
    new_password = common.encry_password(password) + salt

    # 保存用户数据
    pwd_data = common.create_data_format(tag='json', username=username, password=new_password, role=role)
    db_hander.save_data(tag='pwd',path='user_pwd.json', data=pwd_data)

    # 构建日志信息--用户数据
    log_user_data = common.create_data_format(tag='log', username=username, msg=f'注册成功!')
    db_hander.save_data(path=f'{username}_log.txt', data=log_user_data)

    # 初始化银行信息
    bank_data = common.create_data_format(tag='json',username=username, bank_id='1', bank_pwd='None', bank_balance="None")
    db_hander.save_data(tag='bank',path=f'{username}_bank.json', data=bank_data)

    # 银行流水
    log_data = common.create_data_format(tag='log', username=username, msg=f'开卡成功,但未激活!')
    db_hander.save_data(path=f'{username}_log.txt', data=log_data)

    # 银行日志
    flow_data = common.create_data_format(tag='flow', username=username, msg=f'开卡成功,但未激活!')
    db_hander.save_data(path=f'{username}_flow.txt', data=flow_data)
    return True, f'{username} 注册成功!'


def init_bank_data(username):
    flag, user_bank_dict = db_hander.read_data(tag='json', path=f'{username}_bank.json')
    if username not in user_bank_dict:
        return flag, user_bank_dict
    user_bank_data = user_bank_dict.get(username)
    login_user_dict['bank_id'] = user_bank_data.get('bank_id')
    login_user_dict['bank_pwd'] = user_bank_data.get('bank_pwd')
    login_user_dict['bank_balance'] = user_bank_data.get('bank_balance')


@check
def login(**kwargs):
    username = kwargs.get('username')
    password = kwargs.get('password')
    role = kwargs.get('role')
    flag, user_pwd_dict = db_hander.read_data(tag='json', path='user_pwd.json')
    if not flag:
        return False,'未注册'
    if username not in user_pwd_dict:
        flow_data = common.create_data_format(tag='flow', username=username, msg=f'登录失败!')
        db_hander.save_data(path=f'{username}_log.txt', data=flow_data)
        return False, f'当前用户 {username} 不存在!'
    else:
        user_pwd_data = user_pwd_dict.get(username)
        # 比对密码
        new_password = common.encry_password(password)
        print(new_password)
        print(user_pwd_data['password'][:-4])
        if new_password != user_pwd_data['password'][:-4]:
            return False, f'当前密码错误!'
        # 密码正确,登录成功,更新当前的用户字典
        login_user_dict['username'] = username
        if role == 'admin':
            login_user_dict['is_admin'] = True
        else:
            login_user_dict['is_admin'] = False
        init_bank_data(username)
        flow_data = common.create_data_format(tag='flow', username=username, msg=f'登录成功!')
        db_hander.save_data(path=f'{username}_log.txt', data=flow_data)
        return True, f'用户 {username} 登陆成功'

# 验证是否登录过和激活银行卡
def login_auth(func):
    def inner(*args, **kwargs):
        if not login_user_dict['username']:
            return False, f'必须先登录'
        if not login_user_dict['tag'] and func.__name__ != 'add_bank_info':
            return False, '必须先激活银行卡'
        return func(*args, **kwargs)

    return inner


def get_bank_pwd():
    # 【1】要获取用户名和密码
    bank_id = input("请输入银行卡号 :>>>> ").strip()
    bank_pwd = input("请输入支付密码 :>>>> ").strip()
    return bank_id, bank_pwd


# 查看银行流水
@login_auth
def check_bank_flow():
    username = login_user_dict['username']
    flag, user_flow_list = db_hander.read_data(tag='flow', path=f'{username}_flow.txt')
    db_hander.save_data(path=f'{username}_log.txt',
                        data=common.create_data_format(tag="log", username=username, msg="查看余额成功 !"))
    count = 0
    print(f"************* 流水打印开始 *************")
    for flow in user_flow_list:
        count += 1
        print(f"当前第 {count} 条流水 :>>>> {flow}")
    print(f"************* 流水打印结束 *************")
    return True, ''

# 查看个人日志
@login_auth
def check_user_log():
    username = login_user_dict['username']
    flag, data_list = db_hander.read_data(tag='log', path=f'{login_user_dict["username"]}_log.txt')
    if not flag:
        return flag, data_list
    db_hander.save_data(path=f'{username}_log.txt',
                        data=common.create_data_format(tag="log", username=username, msg="查看个人信息成功 !"))
    print(f'''打印当前用户 {login_user_dict["username"]} 日志开始''')
    for data in data_list:
        print(data)
    return True, f'''打印当前用户 {login_user_dict["username"]} 日志结束'''


# 查看银行信息
@login_auth
def check_bank_info():
    username = login_user_dict['username']
    flag, user_bank_dict = db_hander.read_data(tag='json', path=f'{username}_bank.json')
    db_hander.save_data(path=f'{username}_log.txt',
                        data=common.create_data_format(tag="log", username=username, msg="查看银行信息成功 !"))
    user_bank_data = user_bank_dict[username]
    print(f'''
        ------ 当前用户 {username} 信息如下 ------ 
            用   户 :>>>> {username}
            支付密码 :>>>> {user_bank_data['bank_pwd']}
            银行卡号 :>>>> {user_bank_data['bank_id']}
            银行余额 :>>>> {user_bank_data['bank_balance']}
        ''')
    return True, f'当前用户 {username} :>>>> 银行信息查看完成 !'

(2)bank_src

from .user_src import *


# 激活银行卡
@login_auth
def add_bank_info():
    username = login_user_dict['username']
    bank_id, bank_pwd = get_bank_pwd()
    # 激活银行卡
    bank_data = common.create_data_format(tag='json',username=username, bank_id=bank_id, bank_pwd=bank_pwd, bank_balance='1000')
    db_hander.save_data(tag='bank',path=f'{username}_bank.json', data=bank_data)

    # 银行流水
    log_data = common.create_data_format(tag='log', username=username, msg=f'成功激活!')
    db_hander.save_data(path=f'{username}_log.txt', data=log_data)
    # 银行日志
    flow_data = common.create_data_format(tag='flow', username=username, msg=f'成功激活!')
    db_hander.save_data(path=f'{username}_flow.txt', data=flow_data)
    login_user_dict['tag']=True
    init_bank_data(username)
    return True, f'初始化银行信息完成!'


# 检验银行卡
def aaa(tag=None):
    def balance_input(func):
        def inner(*args, **kwargs):
            username = login_user_dict['username']
            flag, user_pwd_dict = db_hander.read_data(tag='json', path=f'{username}_bank.json')
            user_data = user_pwd_dict.get(username)
            bank_id, bank_pwd = get_bank_pwd()
            # 校验银行卡号是否为6位数字
            if not bank_id.isdigit() or len(bank_id) != 6:
                return False, "银行卡号必须是6位数字"
            # 校验银行卡号是否存在
            if bank_id != user_data['bank_id']:
                return False, "银行卡号不存在"
            # 校验支付密码是否为3位数字
            if not bank_pwd.isdigit() or len(bank_pwd) != 3:
                return False, "支付密码必须是3位数字"
            # 校验支付密码是否正确
            if bank_pwd != user_data['bank_pwd']:
                return False, "支付密码错误"
            balance = input("请输入金额 :>>>> ").strip()
            # 校验余额是否为数字
            if not balance.isdigit():
                return False, f"余额不能为零"
            # 校验余额是否大于0小于总余额
            print(user_data['bank_balance'])
            if tag == 'input':
                if int(balance) > int(user_data['bank_balance']):
                    return False, f"余额不足,当前余额为:{user_data['bank_balance']}"
            return func(bank_balance=balance, bank_pwd=bank_pwd, bank_id=bank_id, *args, **kwargs)

        return inner

    return balance_input


# 银行卡提现
@login_auth
@aaa(tag='input')
def get_balance( *args, **kwargs):
    username = login_user_dict['username']
    bank_pwd = kwargs.get('bank_pwd')
    balance = kwargs.get('balance')
    bank_id = kwargs.get('bank_id')
    old_balance = int(login_user_dict['bank_balance']) - int(balance)
    login_user_dict['bank_balance'] = str(old_balance)
    db_hander.save_data(tag='bank',path=f'{username}_bank.json',
                        data=common.create_data_format(tag='json',username=username, bank_id=bank_id, bank_pwd=bank_pwd,
                                          bank_balance=str(old_balance)))
    db_hander.save_data(path=f'{username}_bank_log.txt',
                        data=common.create_data_format(tag="log", username=username,
                                                       msg=f"取出金钱 {balance} 余额为{str(old_balance)}!"))
    db_hander.save_data(path=f'{username}_log.txt',
                        data=common.create_data_format(tag="log", username=username, msg=f"提现 {balance} !"))
    db_hander.save_data(path=f'{username}_flow.txt',
                        data=common.create_data_format(tag="flow", username=username,
                                                       msg=f"提现成功! 当前账户余额 :>>>> {old_balance}"))
    init_bank_data(username)
    return True, f'用户 {username} 提现 {balance} 完成!'


# 转账
@login_auth
@aaa(tag='input')
def transfer_money(**kwargs):
    username = login_user_dict['username']
    balance = kwargs.get('bank_balance')
    bank_id = kwargs.get('bank_id')
    # 获取对方用户账户和银行卡号
    to_username = input("请输入对方用户名 :>>>> ").strip()
    # 获取对方用户的银行信息
    flag, to_bank_data = db_hander.read_data(tag='json', path=f'{to_username}_bank.json')
    # 判断对方用户是否存在
    if username==to_username:
        return False, '不能对自己转账'
    if to_username not in to_bank_data:
        return False, '用户不存在'
    user_data = to_bank_data.get(to_username)
    # 获取对方银行账号
    to_bank_id = input("请输入对方银行卡号 :>>>> ").strip()
    if not to_bank_id.isdigit() or len(to_bank_id) != 6:
        return False, f'银行账号必须为数字,且必须为6位!'
    # 判断用户信息是否正确
    if to_bank_id != user_data['bank_id']:
        return False, f'当前转账账号与目标账号不一致'
    # 获取银行密码:必须是数字,并且必须是三位
    bank_pwd = input("请输入银行密码 :>>>> ").strip()
    if not bank_pwd.isdigit() or len(bank_pwd) != 3:
        return False, f'银行密码必须为数字,且必须为3位!'
    # 校验当前取款密码
    if bank_pwd != user_data['bank_pwd']:
        return False, f'当前取款密码错误!'
    # 转账开始
    # 自己的钱减少
    old_balance = int(login_user_dict['bank_balance']) - int(balance)
    login_user_dict['bank_balance'] = str(old_balance)
    # 对方的钱增加
    to_old_balance = int(user_data['bank_balance']) + int(balance)
    user_data['bank_balance'] = str(to_old_balance)
    #自己信息
    db_hander.save_data(tag='bank',path=f'{username}_bank.json',
                        data=common.create_data_format(tag='json',username=username, bank_id=bank_id,
                                                       bank_pwd=bank_pwd,bank_balance=str(old_balance)))
    db_hander.save_data(path=f'{username}_bank_log.txt',
                        data=common.create_data_format(tag="log", username=username,
                                                       msg=f"取出金钱 {balance} 余额为{str(old_balance)}!"))
    db_hander.save_data(path=f'{username}_log.txt',
                        data=common.create_data_format(tag="log", username=username, msg=f"取出 {balance} !"))
    db_hander.save_data(path=f'{username}_flow.txt',
                        data=common.create_data_format(tag="flow", username=username,
                                                       msg=f"向{to_username}转账{balance}余额为:{old_balance}"))
    #对方信息
    db_hander.save_data(tag='bank',path=f'{to_username }_bank.json',
                        data=common.create_data_format(tag='json',username=to_username , bank_id=bank_id,
                                                       bank_pwd=bank_pwd,bank_balance=str(to_old_balance)))
    db_hander.save_data(path=f'{to_username }_bank_log.txt',
                        data=common.create_data_format(tag="log", username=to_username ,
                                                       msg=f"收到金钱 {balance} 余额为{str(to_old_balance)}!"))
    db_hander.save_data(path=f'{to_username }_log.txt',
                        data=common.create_data_format(tag="log", username=to_username , msg=f"获得 {balance} !"))
    db_hander.save_data(path=f'{to_username }_flow.txt',
                        data=common.create_data_format(tag="flow", username=to_username ,
                                                       msg=f"收到{to_username  }转账{balance}余额为:{to_old_balance}"))
    return True, f'用户 {username} 转账完成!'


# 银行卡充值
@login_auth
@aaa()
def add_balance(bank_pwd, *args, **kwargs):
    username = login_user_dict['username']
    balance = kwargs.get('balance')
    bank_id = kwargs.get('bank_id')
    last_balance = int(login_user_dict['bank_balance']) + int(balance)
    login_user_dict['bank_balance'] = str(last_balance)
    bank_data = common.create_data_format(tag='json',username=username, bank_id=bank_id, bank_pwd=bank_pwd,
                                          bank_balance=str(last_balance))
    db_hander.save_data(tag='bank',path=f'{username}_bank.json', data=bank_data)
    db_hander.save_data(path=f'{username}_bank_log.txt',
                        data=common.create_data_format(tag="log", username=username,
                                                       msg=f"充值金钱 {balance} 余额为{str(last_balance)}!"))
    db_hander.save_data(path=f'{username}_log.txt',
                        data=common.create_data_format(tag="log", username=username, msg=f"充值 {balance} !"))
    db_hander.save_data(path=f'{username}_flow.txt',
                        data=common.create_data_format(tag="flow", username=username,
                                                       msg=f"充值成功! 当前账户余额 :>>>> {last_balance}"))
    init_bank_data(username)
    return True, f'用户 {username} 充值 {balance} 完成!'

【4】db

  • 数据处理方法 db_hander.py
from ATM.conf import settings
import os
import json

def create_path_format(path=None):
    # 用户名和密码 :db/user_data/user_pwd.txt
    a = path.split('_')[-1]
    if 'pwd' in a:
        new_path = os.path.join(settings.path_db_user, path)
    # 银行信息 : db/bank_data/username_bank.txt
    elif 'bank' in a:
        new_path = os.path.join(settings.path_db_bank, path)
    # 日志信息 : log/user_log/username_log.txt|username_bank_log.txt
    elif 'log' in a:
        new_path = os.path.join(settings.path_log_user, path)
    # 流水信息 : log/flow_log/username_flow.txt
    elif 'flow' in a:
        new_path = os.path.join(settings.path_log_flow, path)
    else:
        return False, f'当前参数错误'
    return True, new_path


def read_data(tag=None, path=None, mode='r', encoding='utf8'):
    try:
        user_log_flow_list = []
        flag, new_path = create_path_format(path)
        if not os.path.exists(new_path):
            return False ,''
        if not flag:
            return flag, new_path
        if tag == 'json':
            with open(file=new_path, mode=mode, encoding=encoding) as f:
                data = json.load(f)
            return True, data
        elif tag == 'log' or tag == 'flow':
            with open(file=new_path, mode=mode, encoding=encoding) as fp:
                data = fp.read()
            data_list = data.split('\n')
            data_list.pop()
            for data in data_list:
                user_log_flow_list.append(data)
            return True, user_log_flow_list
        else:
            return False, f'标志非法'
    except Exception as error:
        return False, error


def save_data(tag=None, path=None, data=None, mode='a', encoding='utf8'):
    try:
        # 【1】保存文件的路径及文件名
        flag, new_path = create_path_format(path)
        if not flag:
            return flag, new_path
        # 【3】写入数据
        if tag == 'pwd':
            try:
                with open(new_path, mode='r', encoding=encoding) as f:
                    existing_data = json.load(f)
            except FileNotFoundError:
                existing_data = {}  # 如果文件不存在,则创建一个空字典
            # 合并新数据到已有数据中
            existing_data.update(data)
            with open(file=new_path, mode='w', encoding=encoding) as f:
                f.write(json.dumps(existing_data))
            return True, ''
        if tag == 'bank':
            with open(file=new_path, mode='w', encoding=encoding) as f:
                f.write(json.dumps(data))
        with open(file=new_path, mode=mode, encoding=encoding) as fp:
            fp.write(data + '\n')
        return True, ''
    except Exception as error:
        return False, error

【5】log

  • 日志文件 flow.log 和user.log

【6】bin

  • 启动程序 mian.py
from ATM.core import register,login,check_bank_flow,add_bank_info,get_balance,add_balance,check_user_log,check_bank_info,transfer_money



func_menu = '''
===================用户功能菜单=====================
                  1.注册
                  2.登陆
                  3.激活银行卡
                  4.取款
                  5.转账
                  6.充值余额
                  7.查看银行流水
                  8.查看银行信息
                  9.查看个人日志
======================欢迎使用=======================
'''

func_dict = {
    '1': register,
    '2': login,
    '3': add_bank_info,
    '4': get_balance,
    '5': transfer_money,
    '6': add_balance,
    '7': check_bank_flow,
    '8': check_bank_info,
    '9': check_user_log,
}


def main():
    while True:
        print(func_menu)
        func_id = input("功能ID  :>>>> ").strip()
        if func_id not in func_dict.keys():
            print(f"不存在")
            continue
        func = func_dict[func_id]
        flag, msg = func()
        if flag:
            print(msg)
            if msg=='退出系统':
                break
        else:
            print(msg)
            continue


main()