excel处理
1. 判断数据文件是否存在,如果不存在则创建,如果数据文件为空则抛错
2. 如果数据文件不为空,则判断数据文件名称与传入文件名称是否一致,一致则打开文件进行后续操作,否则抛错
3. 对excel进行处理,根据传入的sheet_index或sheet_name进行数据表获取
4. 循环读取数据,并将数据
5. 写公共方法调用Excel处理类,进行数据处理
def test1(file_name):
"""
判断文件是否存在
:return:
"""
import os
base_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
data_path = os.path.join(base_dir, 'dataFile')
data_file_path = os.path.join(base_dir, 'dataFile/{}'.format(file_name))
if not os.path.exists(data_path):
os.mkdir(data_path)
if not os.path.isfile(data_file_path):
raise FileNotFoundError("文件不存在,请检查后重试!")
else:
print(data_file_path)
if __name__ == '__main__':
test1("test1.txt")
import os
class ExcelParser:
def __init__(self, file_name, sheet=0, title_line=True):
base_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
data_path = os.path.join(base_dir, 'dataFile')
data_file_path = os.path.join(base_dir, 'dataFile/{}'.format(file_name))
if not os.path.exists(data_path):
os.mkdir(data_path)
if not os.path.isfile(data_file_path):
raise FileNotFoundError("文件不存在,请检查后重试!")
else:
self.data_file = data_file_path
self.sheet = sheet
self.title_line = title_line
self._data = list()
def excel_parser(self):
if not self._data:
wk = ow(self.data_file)
if type(self.sheet) not in [int, str]:
raise TypeError("传入sheet类型不支持,请检查后重试")
elif type(self.sheet) == int:
s = wk.sheet_by_index(self.sheet)
else:
s = wk.sheet_by_name(self.sheet)
if self.title_line:
title = s.row_values(0)
for col in range(1, s.nrows):
self._data.append(dict(zip(title, s.row_values(col))))
else:
for col in range(0, s.nrows):
self._data.append(s.row_values(col))
return self._data
class Config:
"""
yaml和Excel数据读取可以放在同一个类进行合并
"""
def __init__(self):
if ExcelParser:
self.config = ExcelParser('excel文件地址').excel_parser
def get(self, element, index=0):
return self.config[index].get(element)
yaml配置文件处理
import os, yaml
from comm.settings import ConfDate
class YamlParser:
def __init__(self, yaml_file_path):
if os.path.exists(yaml_file_path) and os.path.isfile(yaml_file_path):
self.yaml_file_path = yaml_file_path
else:
raise FileNotFoundError("yaml文件不存在,请检查后重试!")
self._data = None
@property
def yaml_parser(self):
if not self._data:
with open(self.yaml_file_path, 'rb') as f:
self._data = list(yaml.safe_load_all(f))
return self._data
class Config:
def __init__(self):
if YamlParser:
self.config = YamlParser(ConfDate).yaml_parser
def get(self, element, index=0):
return self.config[index].get(element)
if __name__ == '__main__':
f = Config().get("local_url")
print(f)
email处理
"""
邮件类。用来给指定用户发送邮件。可指定多个收件人,可带附件。
"""
import re
import smtplib
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
from socket import gaierror, error
from comm.log import Logger
from comm.settings import REPORT_PATH
class Email:
def __init__(self, server, sender, password, receiver, title, message=None, path=None):
"""初始化Email
:param title: 邮件标题,必填。
:param message: 邮件正文,非必填。
:param path: 附件路径,可传入list(多附件)或str(单个附件),非必填。
:param server: smtp服务器,必填。
:param sender: 发件人,必填。
:param password: 发件人密码,必填。
:param receiver: 收件人,多收件人用“;”隔开,必填。
"""
self.title = title
self.message = message
self.files = path
self.msg = MIMEMultipart('related')
self.server = server
self.sender = sender
self.receiver = receiver
self.password = password
def _attach_file(self, att_file):
"""将单个文件添加到附件列表中"""
att = MIMEText(open('%s' % att_file, 'rb').read(), 'plain', 'utf-8')
att["Content-Type"] = 'application/octet-stream'
file_name = re.split(r'[\\|/]', att_file)
att["Content-Disposition"] = 'attachment; filename="%s"' % file_name[-1]
self.msg.attach(att)
Logger.info('attach file {}'.format(att_file))
def send(self):
self.msg['Subject'] = self.title
self.msg['From'] = self.sender
self.msg['To'] = self.receiver
if self.message:
self.msg.attach(MIMEText(self.message))
if self.files:
if isinstance(self.files, list):
for f in self.files:
self._attach_file(f)
elif isinstance(self.files, str):
self._attach_file(self.files)
try:
smtp_server = smtplib.SMTP(self.server)
except (gaierror and error) as e:
Logger.exception('发送邮件失败,无法连接到SMTP服务器,检查网络以及SMTP服务器. %s', e)
else:
try:
smtp_server.login(self.sender, self.password)
except smtplib.SMTPAuthenticationError as e:
Logger.exception('邮箱用户名密码验证失败!%s', e)
else:
smtp_server.sendmail(self.sender, self.receiver.split(';'), self.msg.as_string())
finally:
smtp_server.quit()
Logger.info('发送邮件"{0}"成功! 收件人:{1}。如果没有收到邮件,请检查垃圾箱,'
'同时检查收件人地址是否正确'.format(self.title, self.receiver))
def send_mail(day_time, report):
e = Email(title="{}日的自动化测试报告".format(day_time),
message='这是今天的测试报告,请查收!',
receiver='',
server='smtp.163.com',
sender='',
password='',
path=report
)
e.send()
log处理
log:
file_name: test_case.log
backup: 5
console_level: DEBUG
file_level: DEBUG
pattern: '%(asctime)s - %(message)s'
import logging
import os
from logging.handlers import TimedRotatingFileHandler
from comm.settings import LOG_DIR
from comm.yamlParser import Config
class Logger:
def __init__(self, logger_name="fw"):
self.logger = logging.getLogger(logger_name)
logging.root.setLevel(logging.INFO)
c = Config().get("log")
self.log_file_name = c.get("file_name") if c and c.get("file_name") else 'test_xx'
self.back_up_file = c.get('backup') if c and c.get('backup') else 5
self.console_output_level = c.get("console_level") if c and c.get("console_level") else "WARNING"
self.file_output_level = c.get('file_level') if c and c.get('file_level') else 'DEBUG'
pattern = c.get('pattern') if c and c.get('pattern') else '%(asctime)s - %(name)s - %(levelname)s - %(message)s'
self.formatter = logging.Formatter(pattern)
def get_logger(self):
if not self.logger.handlers:
file_handler = TimedRotatingFileHandler(
filename=os.path.join(LOG_DIR, self.log_file_name),
when='D',
interval=1,
backupCount=self.back_up_file,
delay=True,
encoding='utf-8'
)
console_handler = logging.StreamHandler()
console_handler.setLevel(self.console_output_level)
file_handler.setLevel(self.file_output_level)
file_handler.setFormatter(self.formatter)
console_handler.setFormatter(self.formatter)
self.logger.addHandler(file_handler)
self.logger.addHandler(console_handler)
return self.logger
Logger = Logger().get_logger()
if __name__ == '__main__':
Logger.debug('debug message')
Logger.info('info message')
Logger.warning('warning message')
Logger.error('error message')
Logger.critical('critical message')
数据库连接处理
import pymysql
class ExecuteError(Exception):
pass
class ConnDb:
def __init__(self, host, user, pwd, db_name, port=3306, charset='utf8', autocommit=False):
"""
:param host: 地址
:param user: 用户名
:param pwd: 密码
:param db_name: 数据库名称
:param port: 端口
:param charset: utf8
:param autocommit: 插入数据自动提交==conn.commit()
"""
self.host = host
self.user = user
self.pwd = pwd
self.port = port
self.db = db_name
self.charset = charset
self.autocommit = autocommit
def connect_db(self):
"""
step1:连接数据库
step2:创建游标对象
step3:对数据库进行增删改查
step4:关闭游标
step5:关闭连接
"""
connect = pymysql.connect(
host=self.host,
user=self.user,
password=self.pwd,
db=self.db,
port=self.port,
charset=self.charset,
autocommit=self.autocommit
)
cursor = connect.cursor()
return cursor, connect
def db_find_one(self, sql):
"""
执行sql, 查询一条
:param sql:
:return:
"""
try:
cursor, connect = self.connect_db()
cursor.execute(sql)
data = cursor.fetchone()
cursor.close()
connect.close()
return data
except Exception as e:
raise ExecuteError("sql执行失败,失败原因{}".format(e))
def connect_db_execute_file(self, sql):
"""
执行事务
:param file_path:
:return:
"""
cursor, connect = self.connect_db()
try:
cursor.execute("source %s" % sql)
connect.commit()
cursor.close()
connect.close()
except Exception as e:
connect.rollback()
raise ExecuteError("事务执行失败,失败原因{}".format(e))
def db_find_all(self, sql):
"""
执行sql, 查询全部
:return:
"""
try:
cursor, connect = self.connect_db()
cursor.execute(sql)
data = cursor.fetchall()
cursor.close()
connect.close()
return data
except Exception as e:
raise ExecuteError("sql执行失败,失败原因{}".format(e))
if __name__ == '__main__':
data = ConnDb('localhost', 'root', 'alanyang', 'auto3').db_find_one('SELECT * FROM Env')
print(data)