【一】架构说明
- ATM # 项目名
|- README.md # 项目说明书
|- conf # config 配置 放你项目的配置文件
|- settings.py # 配置文件的内容
|- lib # 公共方法文件
|- common.py
|- core # 业务逻辑
|- bank_src.py # 核心代码
|- user_src.py # 核心代码
|- db # database 数据库目录
|- db_hander.py # 数据处理方法
|- log # 日志文件
|- flow.log # 日志文件
|- user.log # 日志文件
|- bin # 启动文件目录
|- main # 启动程序
【二】详细说明
【1】conf
import os
import sys
BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
def create_flie(path,name):
create_path = os.path.join(path, name)
os.makedirs(create_path, exist_ok=True)
create_flie(BASE_DIR,"log")
create_flie(BASE_DIR,"bin")
create_flie(BASE_DIR,"db")
create_flie(BASE_DIR,"lib")
create_flie(BASE_DIR,"core")
create_flie(BASE_DIR,"interface")
path_db = os.path.join(BASE_DIR, 'db')
create_flie(path_db,'user_data')
path_db_user = os.path.join(path_db, 'user_data')
# F:\python项目\untitled\ATM\db\user_data
create_flie(path_db,'bank_data')
path_db_bank = os.path.join(path_db, 'bank_data')
# F:\python项目\untitled\ATM\db\bank_data
path_log = os.path.join(BASE_DIR, 'log')
create_flie(path_log,'user_log')
path_log_user = os.path.join(path_log, 'user_log')
# F:\python项目\untitled\ATM\log\user_log
create_flie(path_log,'flow_log')
path_log_flow = os.path.join(path_log, 'flow_log')
# F:\python项目\untitled\ATM\log\flow_log
【2】lib
from datetime import datetime
import hashlib,random
def Codes():
code=''
for i in range(4):
a=random.randint(0,9)
b=chr(random.randint(97,122))
c=chr(random.randint(65,90))
res = random.choice([a,b,c])
code=code+str(res)
return code
def encry_password(data):
md5 = hashlib.md5(data.encode('utf-8'))
return md5.hexdigest()
def create_data_format(tag=None, **kwargs):
try:
if tag=='json':
data={}
data[kwargs.get('username')]=kwargs
return data
data = '|'.join(kwargs.values())
if tag == 'log' or tag == 'flow':
data = f'当前用户 {kwargs.get("username")} 于 {datetime.now().strftime("%Y年%m月%d日 %H时%M分%S秒")} :>>>>{kwargs.get("msg")}'
return data
except Exception as error:
return error
【3】core
(1)user_src
from ATM.db import db_hander
from ATM.lib import common
login_user_dict = {
'username': None,
"is_admin": False,
"bank_id": None,
"bank_pwd": None,
"bank_balance": None,
'tag' : False
}
def get_user_pwd():
# 【1】要获取用户名和密码
username = input("用户名 :>>>> ").strip()
password = input("密码 :>>>> ").strip()
return username, password
def check(func):
def inner(*args, **kwargs):
username, password = get_user_pwd()
# 校验密码格式 -- 必须是数字且是 3 位
if not password.isdigit():
return False, f'当前密码非法!'
if len(password) != 3:
return False, f'当前密码必须是3位'
# 给身份,是管理员还是普通用户
if username == 'qcc' and password == '123':
role = 'admin'
else:
role = 'normal'
return True, func(username=username, password=password, role=role,*args, **kwargs)
return inner
@check
def register(**kwargs):
username = kwargs.get('username')
msg,user_list=db_hander.read_data(tag='json',path='user_pwd.json')
if username in user_list:
return False,'用户已注册'
password = kwargs.get('password')
role = kwargs.get('role')
salt = common.Codes()
new_password = common.encry_password(password) + salt
# 保存用户数据
pwd_data = common.create_data_format(tag='json', username=username, password=new_password, role=role)
db_hander.save_data(tag='pwd',path='user_pwd.json', data=pwd_data)
# 构建日志信息--用户数据
log_user_data = common.create_data_format(tag='log', username=username, msg=f'注册成功!')
db_hander.save_data(path=f'{username}_log.txt', data=log_user_data)
# 初始化银行信息
bank_data = common.create_data_format(tag='json',username=username, bank_id='1', bank_pwd='None', bank_balance="None")
db_hander.save_data(tag='bank',path=f'{username}_bank.json', data=bank_data)
# 银行流水
log_data = common.create_data_format(tag='log', username=username, msg=f'开卡成功,但未激活!')
db_hander.save_data(path=f'{username}_log.txt', data=log_data)
# 银行日志
flow_data = common.create_data_format(tag='flow', username=username, msg=f'开卡成功,但未激活!')
db_hander.save_data(path=f'{username}_flow.txt', data=flow_data)
return True, f'{username} 注册成功!'
def init_bank_data(username):
flag, user_bank_dict = db_hander.read_data(tag='json', path=f'{username}_bank.json')
if username not in user_bank_dict:
return flag, user_bank_dict
user_bank_data = user_bank_dict.get(username)
login_user_dict['bank_id'] = user_bank_data.get('bank_id')
login_user_dict['bank_pwd'] = user_bank_data.get('bank_pwd')
login_user_dict['bank_balance'] = user_bank_data.get('bank_balance')
@check
def login(**kwargs):
username = kwargs.get('username')
password = kwargs.get('password')
role = kwargs.get('role')
flag, user_pwd_dict = db_hander.read_data(tag='json', path='user_pwd.json')
if not flag:
return False,'未注册'
if username not in user_pwd_dict:
flow_data = common.create_data_format(tag='flow', username=username, msg=f'登录失败!')
db_hander.save_data(path=f'{username}_log.txt', data=flow_data)
return False, f'当前用户 {username} 不存在!'
else:
user_pwd_data = user_pwd_dict.get(username)
# 比对密码
new_password = common.encry_password(password)
print(new_password)
print(user_pwd_data['password'][:-4])
if new_password != user_pwd_data['password'][:-4]:
return False, f'当前密码错误!'
# 密码正确,登录成功,更新当前的用户字典
login_user_dict['username'] = username
if role == 'admin':
login_user_dict['is_admin'] = True
else:
login_user_dict['is_admin'] = False
init_bank_data(username)
flow_data = common.create_data_format(tag='flow', username=username, msg=f'登录成功!')
db_hander.save_data(path=f'{username}_log.txt', data=flow_data)
return True, f'用户 {username} 登陆成功'
# 验证是否登录过和激活银行卡
def login_auth(func):
def inner(*args, **kwargs):
if not login_user_dict['username']:
return False, f'必须先登录'
if not login_user_dict['tag'] and func.__name__ != 'add_bank_info':
return False, '必须先激活银行卡'
return func(*args, **kwargs)
return inner
def get_bank_pwd():
# 【1】要获取用户名和密码
bank_id = input("请输入银行卡号 :>>>> ").strip()
bank_pwd = input("请输入支付密码 :>>>> ").strip()
return bank_id, bank_pwd
# 查看银行流水
@login_auth
def check_bank_flow():
username = login_user_dict['username']
flag, user_flow_list = db_hander.read_data(tag='flow', path=f'{username}_flow.txt')
db_hander.save_data(path=f'{username}_log.txt',
data=common.create_data_format(tag="log", username=username, msg="查看余额成功 !"))
count = 0
print(f"************* 流水打印开始 *************")
for flow in user_flow_list:
count += 1
print(f"当前第 {count} 条流水 :>>>> {flow}")
print(f"************* 流水打印结束 *************")
return True, ''
# 查看个人日志
@login_auth
def check_user_log():
username = login_user_dict['username']
flag, data_list = db_hander.read_data(tag='log', path=f'{login_user_dict["username"]}_log.txt')
if not flag:
return flag, data_list
db_hander.save_data(path=f'{username}_log.txt',
data=common.create_data_format(tag="log", username=username, msg="查看个人信息成功 !"))
print(f'''打印当前用户 {login_user_dict["username"]} 日志开始''')
for data in data_list:
print(data)
return True, f'''打印当前用户 {login_user_dict["username"]} 日志结束'''
# 查看银行信息
@login_auth
def check_bank_info():
username = login_user_dict['username']
flag, user_bank_dict = db_hander.read_data(tag='json', path=f'{username}_bank.json')
db_hander.save_data(path=f'{username}_log.txt',
data=common.create_data_format(tag="log", username=username, msg="查看银行信息成功 !"))
user_bank_data = user_bank_dict[username]
print(f'''
------ 当前用户 {username} 信息如下 ------
用 户 :>>>> {username}
支付密码 :>>>> {user_bank_data['bank_pwd']}
银行卡号 :>>>> {user_bank_data['bank_id']}
银行余额 :>>>> {user_bank_data['bank_balance']}
''')
return True, f'当前用户 {username} :>>>> 银行信息查看完成 !'
(2)bank_src
from .user_src import *
# 激活银行卡
@login_auth
def add_bank_info():
username = login_user_dict['username']
bank_id, bank_pwd = get_bank_pwd()
# 激活银行卡
bank_data = common.create_data_format(tag='json',username=username, bank_id=bank_id, bank_pwd=bank_pwd, bank_balance='1000')
db_hander.save_data(tag='bank',path=f'{username}_bank.json', data=bank_data)
# 银行流水
log_data = common.create_data_format(tag='log', username=username, msg=f'成功激活!')
db_hander.save_data(path=f'{username}_log.txt', data=log_data)
# 银行日志
flow_data = common.create_data_format(tag='flow', username=username, msg=f'成功激活!')
db_hander.save_data(path=f'{username}_flow.txt', data=flow_data)
login_user_dict['tag']=True
init_bank_data(username)
return True, f'初始化银行信息完成!'
# 检验银行卡
def aaa(tag=None):
def balance_input(func):
def inner(*args, **kwargs):
username = login_user_dict['username']
flag, user_pwd_dict = db_hander.read_data(tag='json', path=f'{username}_bank.json')
user_data = user_pwd_dict.get(username)
bank_id, bank_pwd = get_bank_pwd()
# 校验银行卡号是否为6位数字
if not bank_id.isdigit() or len(bank_id) != 6:
return False, "银行卡号必须是6位数字"
# 校验银行卡号是否存在
if bank_id != user_data['bank_id']:
return False, "银行卡号不存在"
# 校验支付密码是否为3位数字
if not bank_pwd.isdigit() or len(bank_pwd) != 3:
return False, "支付密码必须是3位数字"
# 校验支付密码是否正确
if bank_pwd != user_data['bank_pwd']:
return False, "支付密码错误"
balance = input("请输入金额 :>>>> ").strip()
# 校验余额是否为数字
if not balance.isdigit():
return False, f"余额不能为零"
# 校验余额是否大于0小于总余额
print(user_data['bank_balance'])
if tag == 'input':
if int(balance) > int(user_data['bank_balance']):
return False, f"余额不足,当前余额为:{user_data['bank_balance']}"
return func(bank_balance=balance, bank_pwd=bank_pwd, bank_id=bank_id, *args, **kwargs)
return inner
return balance_input
# 银行卡提现
@login_auth
@aaa(tag='input')
def get_balance( *args, **kwargs):
username = login_user_dict['username']
bank_pwd = kwargs.get('bank_pwd')
balance = kwargs.get('balance')
bank_id = kwargs.get('bank_id')
old_balance = int(login_user_dict['bank_balance']) - int(balance)
login_user_dict['bank_balance'] = str(old_balance)
db_hander.save_data(tag='bank',path=f'{username}_bank.json',
data=common.create_data_format(tag='json',username=username, bank_id=bank_id, bank_pwd=bank_pwd,
bank_balance=str(old_balance)))
db_hander.save_data(path=f'{username}_bank_log.txt',
data=common.create_data_format(tag="log", username=username,
msg=f"取出金钱 {balance} 余额为{str(old_balance)}!"))
db_hander.save_data(path=f'{username}_log.txt',
data=common.create_data_format(tag="log", username=username, msg=f"提现 {balance} !"))
db_hander.save_data(path=f'{username}_flow.txt',
data=common.create_data_format(tag="flow", username=username,
msg=f"提现成功! 当前账户余额 :>>>> {old_balance}"))
init_bank_data(username)
return True, f'用户 {username} 提现 {balance} 完成!'
# 转账
@login_auth
@aaa(tag='input')
def transfer_money(**kwargs):
username = login_user_dict['username']
balance = kwargs.get('bank_balance')
bank_id = kwargs.get('bank_id')
# 获取对方用户账户和银行卡号
to_username = input("请输入对方用户名 :>>>> ").strip()
# 获取对方用户的银行信息
flag, to_bank_data = db_hander.read_data(tag='json', path=f'{to_username}_bank.json')
# 判断对方用户是否存在
if username==to_username:
return False, '不能对自己转账'
if to_username not in to_bank_data:
return False, '用户不存在'
user_data = to_bank_data.get(to_username)
# 获取对方银行账号
to_bank_id = input("请输入对方银行卡号 :>>>> ").strip()
if not to_bank_id.isdigit() or len(to_bank_id) != 6:
return False, f'银行账号必须为数字,且必须为6位!'
# 判断用户信息是否正确
if to_bank_id != user_data['bank_id']:
return False, f'当前转账账号与目标账号不一致'
# 获取银行密码:必须是数字,并且必须是三位
bank_pwd = input("请输入银行密码 :>>>> ").strip()
if not bank_pwd.isdigit() or len(bank_pwd) != 3:
return False, f'银行密码必须为数字,且必须为3位!'
# 校验当前取款密码
if bank_pwd != user_data['bank_pwd']:
return False, f'当前取款密码错误!'
# 转账开始
# 自己的钱减少
old_balance = int(login_user_dict['bank_balance']) - int(balance)
login_user_dict['bank_balance'] = str(old_balance)
# 对方的钱增加
to_old_balance = int(user_data['bank_balance']) + int(balance)
user_data['bank_balance'] = str(to_old_balance)
#自己信息
db_hander.save_data(tag='bank',path=f'{username}_bank.json',
data=common.create_data_format(tag='json',username=username, bank_id=bank_id,
bank_pwd=bank_pwd,bank_balance=str(old_balance)))
db_hander.save_data(path=f'{username}_bank_log.txt',
data=common.create_data_format(tag="log", username=username,
msg=f"取出金钱 {balance} 余额为{str(old_balance)}!"))
db_hander.save_data(path=f'{username}_log.txt',
data=common.create_data_format(tag="log", username=username, msg=f"取出 {balance} !"))
db_hander.save_data(path=f'{username}_flow.txt',
data=common.create_data_format(tag="flow", username=username,
msg=f"向{to_username}转账{balance}余额为:{old_balance}"))
#对方信息
db_hander.save_data(tag='bank',path=f'{to_username }_bank.json',
data=common.create_data_format(tag='json',username=to_username , bank_id=bank_id,
bank_pwd=bank_pwd,bank_balance=str(to_old_balance)))
db_hander.save_data(path=f'{to_username }_bank_log.txt',
data=common.create_data_format(tag="log", username=to_username ,
msg=f"收到金钱 {balance} 余额为{str(to_old_balance)}!"))
db_hander.save_data(path=f'{to_username }_log.txt',
data=common.create_data_format(tag="log", username=to_username , msg=f"获得 {balance} !"))
db_hander.save_data(path=f'{to_username }_flow.txt',
data=common.create_data_format(tag="flow", username=to_username ,
msg=f"收到{to_username }转账{balance}余额为:{to_old_balance}"))
return True, f'用户 {username} 转账完成!'
# 银行卡充值
@login_auth
@aaa()
def add_balance(bank_pwd, *args, **kwargs):
username = login_user_dict['username']
balance = kwargs.get('balance')
bank_id = kwargs.get('bank_id')
last_balance = int(login_user_dict['bank_balance']) + int(balance)
login_user_dict['bank_balance'] = str(last_balance)
bank_data = common.create_data_format(tag='json',username=username, bank_id=bank_id, bank_pwd=bank_pwd,
bank_balance=str(last_balance))
db_hander.save_data(tag='bank',path=f'{username}_bank.json', data=bank_data)
db_hander.save_data(path=f'{username}_bank_log.txt',
data=common.create_data_format(tag="log", username=username,
msg=f"充值金钱 {balance} 余额为{str(last_balance)}!"))
db_hander.save_data(path=f'{username}_log.txt',
data=common.create_data_format(tag="log", username=username, msg=f"充值 {balance} !"))
db_hander.save_data(path=f'{username}_flow.txt',
data=common.create_data_format(tag="flow", username=username,
msg=f"充值成功! 当前账户余额 :>>>> {last_balance}"))
init_bank_data(username)
return True, f'用户 {username} 充值 {balance} 完成!'
【4】db
from ATM.conf import settings
import os
import json
def create_path_format(path=None):
# 用户名和密码 :db/user_data/user_pwd.txt
a = path.split('_')[-1]
if 'pwd' in a:
new_path = os.path.join(settings.path_db_user, path)
# 银行信息 : db/bank_data/username_bank.txt
elif 'bank' in a:
new_path = os.path.join(settings.path_db_bank, path)
# 日志信息 : log/user_log/username_log.txt|username_bank_log.txt
elif 'log' in a:
new_path = os.path.join(settings.path_log_user, path)
# 流水信息 : log/flow_log/username_flow.txt
elif 'flow' in a:
new_path = os.path.join(settings.path_log_flow, path)
else:
return False, f'当前参数错误'
return True, new_path
def read_data(tag=None, path=None, mode='r', encoding='utf8'):
try:
user_log_flow_list = []
flag, new_path = create_path_format(path)
if not os.path.exists(new_path):
return False ,''
if not flag:
return flag, new_path
if tag == 'json':
with open(file=new_path, mode=mode, encoding=encoding) as f:
data = json.load(f)
return True, data
elif tag == 'log' or tag == 'flow':
with open(file=new_path, mode=mode, encoding=encoding) as fp:
data = fp.read()
data_list = data.split('\n')
data_list.pop()
for data in data_list:
user_log_flow_list.append(data)
return True, user_log_flow_list
else:
return False, f'标志非法'
except Exception as error:
return False, error
def save_data(tag=None, path=None, data=None, mode='a', encoding='utf8'):
try:
# 【1】保存文件的路径及文件名
flag, new_path = create_path_format(path)
if not flag:
return flag, new_path
# 【3】写入数据
if tag == 'pwd':
try:
with open(new_path, mode='r', encoding=encoding) as f:
existing_data = json.load(f)
except FileNotFoundError:
existing_data = {} # 如果文件不存在,则创建一个空字典
# 合并新数据到已有数据中
existing_data.update(data)
with open(file=new_path, mode='w', encoding=encoding) as f:
f.write(json.dumps(existing_data))
return True, ''
if tag == 'bank':
with open(file=new_path, mode='w', encoding=encoding) as f:
f.write(json.dumps(data))
with open(file=new_path, mode=mode, encoding=encoding) as fp:
fp.write(data + '\n')
return True, ''
except Exception as error:
return False, error
【5】log
【6】bin
from ATM.core import register,login,check_bank_flow,add_bank_info,get_balance,add_balance,check_user_log,check_bank_info,transfer_money
func_menu = '''
===================用户功能菜单=====================
1.注册
2.登陆
3.激活银行卡
4.取款
5.转账
6.充值余额
7.查看银行流水
8.查看银行信息
9.查看个人日志
======================欢迎使用=======================
'''
func_dict = {
'1': register,
'2': login,
'3': add_bank_info,
'4': get_balance,
'5': transfer_money,
'6': add_balance,
'7': check_bank_flow,
'8': check_bank_info,
'9': check_user_log,
}
def main():
while True:
print(func_menu)
func_id = input("功能ID :>>>> ").strip()
if func_id not in func_dict.keys():
print(f"不存在")
continue
func = func_dict[func_id]
flag, msg = func()
if flag:
print(msg)
if msg=='退出系统':
break
else:
print(msg)
continue
main()