当先锋百科网

首页 1 2 3 4 5 6 7

excel处理

	1. 判断数据文件是否存在,如果不存在则创建,如果数据文件为空则抛错
	2. 如果数据文件不为空,则判断数据文件名称与传入文件名称是否一致,一致则打开文件进行后续操作,否则抛错
	3. 对excel进行处理,根据传入的sheet_index或sheet_name进行数据表获取
	4. 循环读取数据,并将数据
	5. 写公共方法调用Excel处理类,进行数据处理
	### 测试代码
	#  判断数据文件是否存在,如果不存在则创建,如果数据文件为空则抛错
	#  此处将用到Python os模块
	
	def test1(file_name):
	    """
	    判断文件是否存在
	    :return:
	    """
	    import os
	    # 根路径
	    base_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
	    # 数据存储文件目录
	    data_path = os.path.join(base_dir, 'dataFile')
	    # 数据文件
	    data_file_path = os.path.join(base_dir, 'dataFile/{}'.format(file_name))
	    # 判断数据存储文件夹是否存在,不存在则创建
	    if not os.path.exists(data_path):
	        os.mkdir(data_path)
	     # 判断数据文件是否存在,不存在则报错,存在则执行下一步
	    if not os.path.isfile(data_file_path):
	        raise FileNotFoundError("文件不存在,请检查后重试!")
	    else:
	        print(data_file_path)

if __name__ == '__main__':
	# 测试代码
    test1("test1.txt")
# -*- coding: utf-8 -*-
# @Time    : 2021-08-10 15:09
# @Author  : Yangyufeng
import os

class ExcelParser:

    def __init__(self, file_name, sheet=0, title_line=True):
        # 配置文件信息需存放在settings文件中此处为了方便暂时存储 后续将进行转移
        base_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
        data_path = os.path.join(base_dir, 'dataFile')
        data_file_path = os.path.join(base_dir, 'dataFile/{}'.format(file_name))
        if not os.path.exists(data_path):
            os.mkdir(data_path)
        if not os.path.isfile(data_file_path):
            raise FileNotFoundError("文件不存在,请检查后重试!")
        else:
            self.data_file = data_file_path
        # sheet页索引或者名称
        self.sheet = sheet
        # 是否存在标题行,有标题行,每一行都是都是对应列名的取值;
        # 没有标题行,每一行都是一个列表
        self.title_line = title_line
        self._data = list()

    def excel_parser(self):
        if not self._data:
            wk = ow(self.data_file)
            # xlrd 支持索引和名字取值(int, str)
            if type(self.sheet) not in [int, str]:
                raise TypeError("传入sheet类型不支持,请检查后重试")
            elif type(self.sheet) == int:
                s = wk.sheet_by_index(self.sheet)
            else:
                s = wk.sheet_by_name(self.sheet)
            if self.title_line:
                # 首行为表头
                title = s.row_values(0)
                for col in range(1, s.nrows):
                    self._data.append(dict(zip(title, s.row_values(col))))
            else:
                for col in range(0, s.nrows):
                    self._data.append(s.row_values(col))
            return self._data

	class Config:
		"""
			yaml和Excel数据读取可以放在同一个类进行合并
		"""
	    def __init__(self):
	        if ExcelParser:
	            self.config = ExcelParser('excel文件地址').excel_parser
	
	    def get(self, element, index=0):
	        return self.config[index].get(element)

yaml配置文件处理

	# yaml数据文件处理-在此框架中yaml主要做为config文件存储数据
	# txt xml等都可以做为配置文件使用
	# 拥有数据库权限时,也可以将数据存储在数据库中, 但是需要提前设置数据库连接方法,通过指定类进行数据存储与读取
	import os, yaml
	from comm.settings import ConfDate
	
	class YamlParser:
	    def __init__(self, yaml_file_path):
	        if os.path.exists(yaml_file_path) and os.path.isfile(yaml_file_path):
	            self.yaml_file_path = yaml_file_path
	        else:
	            raise FileNotFoundError("yaml文件不存在,请检查后重试!")
	        self._data = None
	        
	    @property
	    def yaml_parser(self):
	        if not self._data:
	            with open(self.yaml_file_path, 'rb') as f:
	                self._data = list(yaml.safe_load_all(f))
	            return self._data
	
	class Config:
	    def __init__(self):
	        if YamlParser:
	            self.config = YamlParser(ConfDate).yaml_parser
	
	    def get(self, element, index=0):
	        return self.config[index].get(element)
	
	# 单元测试
	if __name__ == '__main__':
	    f = Config().get("local_url")
	    print(f)

email处理

# -*- coding: utf-8 -*-
# @Time    : 2021-08-12 10:53
# @Author  : Yangyufeng
"""
邮件类。用来给指定用户发送邮件。可指定多个收件人,可带附件。
"""
import re
import smtplib
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
from socket import gaierror, error
from comm.log import Logger
from comm.settings import REPORT_PATH

class Email:
    def __init__(self, server, sender, password, receiver, title, message=None, path=None):
        """初始化Email
        :param title: 邮件标题,必填。
        :param message: 邮件正文,非必填。
        :param path: 附件路径,可传入list(多附件)或str(单个附件),非必填。
        :param server: smtp服务器,必填。
        :param sender: 发件人,必填。
        :param password: 发件人密码,必填。
        :param receiver: 收件人,多收件人用“;”隔开,必填。
        """
        self.title = title
        self.message = message
        self.files = path

        self.msg = MIMEMultipart('related')

        self.server = server
        self.sender = sender
        self.receiver = receiver
        self.password = password

    def _attach_file(self, att_file):
        """将单个文件添加到附件列表中"""
        att = MIMEText(open('%s' % att_file, 'rb').read(), 'plain', 'utf-8')
        att["Content-Type"] = 'application/octet-stream'
        file_name = re.split(r'[\\|/]', att_file)
        att["Content-Disposition"] = 'attachment; filename="%s"' % file_name[-1]
        self.msg.attach(att)
        Logger.info('attach file {}'.format(att_file))

    def send(self):
        self.msg['Subject'] = self.title
        self.msg['From'] = self.sender
        self.msg['To'] = self.receiver

        # 邮件正文
        if self.message:
            self.msg.attach(MIMEText(self.message))

        # 添加附件,支持多个附件(传入list),或者单个附件(传入str)
        if self.files:
            if isinstance(self.files, list):
                for f in self.files:
                    self._attach_file(f)
            elif isinstance(self.files, str):
                self._attach_file(self.files)

        # 连接服务器并发送
        try:
            smtp_server = smtplib.SMTP(self.server)  # 连接sever
        except (gaierror and error) as e:
            Logger.exception('发送邮件失败,无法连接到SMTP服务器,检查网络以及SMTP服务器. %s', e)
        else:
            try:
                smtp_server.login(self.sender, self.password)  # 登录
            except smtplib.SMTPAuthenticationError as e:
                Logger.exception('邮箱用户名密码验证失败!%s', e)
            else:
                smtp_server.sendmail(self.sender, self.receiver.split(';'), self.msg.as_string())  # 发送邮件
            finally:
                smtp_server.quit()  # 断开连接
                Logger.info('发送邮件"{0}"成功! 收件人:{1}。如果没有收到邮件,请检查垃圾箱,'
                            '同时检查收件人地址是否正确'.format(self.title, self.receiver))

# 邮件配置
def send_mail(day_time, report):
    e = Email(title="{}日的自动化测试报告".format(day_time),
              message='这是今天的测试报告,请查收!',
              receiver='',
              server='smtp.163.com',
              sender='',
              password='',
              path=report
              )
    e.send()

log处理

# yml->日志文件配置信息
log:
    file_name: test_case.log      # 输出日志文件名
    backup: 5                # 备份名
    console_level: DEBUG  # 控制台输出等级
    file_level: DEBUG       # 文件输出等级
	#    pattern: '%(asctime)s - %(name)s - %(levelname)s - %(message)s'  # 打印输出格式
    pattern: '%(asctime)s - %(message)s'  # 打印输出格式
	    
# -*- coding: utf-8 -*-
# @Time    : 2021-08-12 10:52
# @Author  : Yangyufeng

import logging
import os
from logging.handlers import TimedRotatingFileHandler
from comm.settings import LOG_DIR
from comm.yamlParser import Config

class Logger:
    def __init__(self, logger_name="fw"):
        self.logger = logging.getLogger(logger_name)
        logging.root.setLevel(logging.INFO)
        c = Config().get("log")
        self.log_file_name = c.get("file_name") if c and c.get("file_name") else 'test_xx'
        self.back_up_file = c.get('backup') if c and c.get('backup') else 5
        self.console_output_level = c.get("console_level") if c and c.get("console_level") else "WARNING"
        self.file_output_level = c.get('file_level') if c and c.get('file_level') else 'DEBUG'
        pattern = c.get('pattern') if c and c.get('pattern') else '%(asctime)s - %(name)s - %(levelname)s - %(message)s'
        self.formatter = logging.Formatter(pattern)

    def get_logger(self):
        if not self.logger.handlers:
            file_handler = TimedRotatingFileHandler(
                filename=os.path.join(LOG_DIR, self.log_file_name),
                when='D',
                interval=1,
                backupCount=self.back_up_file,
                delay=True,
                encoding='utf-8'
            )
            console_handler = logging.StreamHandler()
            console_handler.setLevel(self.console_output_level)
            file_handler.setLevel(self.file_output_level)
            file_handler.setFormatter(self.formatter)
            console_handler.setFormatter(self.formatter)
            self.logger.addHandler(file_handler)
            self.logger.addHandler(console_handler)
        return self.logger

Logger = Logger().get_logger()


if __name__ == '__main__':
    Logger.debug('debug message')
    Logger.info('info message')
    Logger.warning('warning message')
    Logger.error('error message')
    Logger.critical('critical message')

数据库连接处理

# -*- coding: utf-8 -*-
# @Time    : 2021-08-12 10:54
# @Author  : Yangyufeng
import pymysql


class ExecuteError(Exception):
    pass


class ConnDb:
    def __init__(self, host, user, pwd, db_name, port=3306, charset='utf8', autocommit=False):
        """
        :param host: 地址
        :param user: 用户名
        :param pwd: 密码
        :param db_name: 数据库名称
        :param port: 端口
        :param charset:  utf8
        :param autocommit: 插入数据自动提交==conn.commit()
        """
        self.host = host
        self.user = user
        self.pwd = pwd
        self.port = port
        self.db = db_name
        self.charset = charset
        self.autocommit = autocommit

    def connect_db(self):
        """
        step1:连接数据库
        step2:创建游标对象
        step3:对数据库进行增删改查
        step4:关闭游标
        step5:关闭连接
        """
        connect = pymysql.connect(
            host=self.host,
            user=self.user,
            password=self.pwd,
            db=self.db,
            port=self.port,
            charset=self.charset,
            autocommit=self.autocommit
        )
        cursor = connect.cursor()
        return cursor, connect

    def db_find_one(self, sql):
        """
           执行sql, 查询一条
           :param sql:
           :return:
        """
        try:
            cursor, connect = self.connect_db()
            cursor.execute(sql)
            data = cursor.fetchone()
            cursor.close()
            connect.close()
            return data
        except Exception as e:
            raise ExecuteError("sql执行失败,失败原因{}".format(e))

    def connect_db_execute_file(self, sql):
        """
            执行事务
            :param file_path:
            :return:
        """
        cursor, connect = self.connect_db()
        try:
            cursor.execute("source %s" % sql)
            connect.commit()
            cursor.close()
            connect.close()
        except Exception as e:
            connect.rollback()
            raise ExecuteError("事务执行失败,失败原因{}".format(e))

    def db_find_all(self, sql):
        """
            执行sql, 查询全部
            :return:
        """
        try:
            cursor, connect = self.connect_db()
            cursor.execute(sql)
            data = cursor.fetchall()
            cursor.close()
            connect.close()
            return data
        except Exception as e:
            raise ExecuteError("sql执行失败,失败原因{}".format(e))


if __name__ == '__main__':
    data = ConnDb('localhost', 'root', 'alanyang', 'auto3').db_find_one('SELECT * FROM Env')
    print(data)