Category Archive : python相关

jenkins进阶之docker运行pytest并且出allure报告

背景

最近想做一个简单的pytest 测试,用allure出报告,结果发现网上的方法都是在windows上装jenkins,然后用jenkins跑一个本地的运行环境。这种做法明显很不2019年。于是我决定做一个在jenkins上使用docker运行pytest,然后再出allure报告的文章。

思路

  1. 在一台电脑上安装jenkins,可以参考我的文章https://www.yinyubo.cn/?p=268
  2. 准备python代码,放到git上,jenkins通过git拉取代码。代码中包含Dockerfile,测试用例,requirements.txt
  3. 拉取代码之后,通过Dockerfile产生一个python的镜像,并且在镜像中通过requirements.txt去安装pytest,allure的相关依赖
  4. 通过这个镜像,产生一个容器,容器命令运行pytest,产生测试报告,并且挂载到jenkins服务器上.
  5. (注意,这里为什么要在容器里运行pytest呢?因为如果在Dockerfile里运行pytest,一旦产生测试失败,是会导致jenkins退出shell执行的)
  6. Jenkins通过allure插件读取第4步产生的测试报告。生成allure报告

具体步骤

  1. 第一步忽略。请参考文章https://www.yinyubo.cn/?p=268
  2. 准备python代码。产生Dockerfile,测试用例,requirements.txt,如下图


Dockerfile内容如下:

FROM python:3.7.3
WORKDIR .
ADD . .
RUN pip install -r requirements.txt
CMD ["pytest", "-q","TestCase/AIMP/Connect/AIMP_Connect_01.py","--alluredir","allure-results"]

requirements.txt内容如下:

allure-pytest==2.6.2
allure-python-commons==2.6.2
atomicwrites==1.3.0
attrs==19.1.0
colorama==0.4.1
more-itertools==7.0.0
pluggy==0.9.0
py==1.8.0
pytest==4.4.1
six==1.12.0

测试用例可以如下

import pytest
def test_success():
    """this test succeeds"""
    print(1)
    assert True
def test_failure():
    print(2)
    """this test fails"""
    assert False
def test_skip():
    print(3)
    """this test is skipped"""
    pytest.skip('for a reason!')
def test_broken():
    raise Exception('oops')
def test_success2():
    print(4)
    assert True

     3.jenkins配置,这里如何拉取git代码就不写了。这个很简单,主要是构建命令和构建后操作,这里我复制了我的构建命令如下:

name="apitest"
docker build -t $name .
docker run -d --name $name -v $PWD/allure-results:/allure-results $name

效果如图:

    4.运行job,查看allure结果

python常用的几种文件操作,复制,移动,打包

需求

经常会有用到python的文件复制,移动,打包的功能, 我这边用的方法是shuil库来进行这些操作。偶尔穿插了一些图片大小重置pil库和汉字转拼音的方法pypinyin库

代码

# -*- coding: UTF-8 -*-
import os
import shutil
import traceback
import pypinyin
from PIL import Image
from globalLog import ta_log
def copy_file(srcfile, dstfile):
    if not os.path.isfile(srcfile):
        # 一般来说,是因为复制的不是文件所导致的,不影响
        ta_log.info("%s not file!" % (srcfile))
    else:
        fpath, fname = os.path.split(dstfile)  # 分离文件名和路径
        if not os.path.exists(fpath):
            os.makedirs(fpath)  # 创建路径
        shutil.copy2(srcfile, dstfile)  # 移动文件
        ta_log.info("copy %s -> %s" % (srcfile, dstfile))
def movefile(srcfile, dstfile):
    if not os.path.isfile(srcfile):
        ta_log.error("%s not exist!" % (srcfile))
    else:
        fpath, fname = os.path.split(dstfile)  # 分离文件名和路径
        if not os.path.exists(fpath):
            os.makedirs(fpath)  # 创建路径
        shutil.move(srcfile, dstfile)  # 移动文件
        ta_log.info("move %s -> %s" % (srcfile, dstfile))
def copy_file_to_capture(srcfile):
    '''
    复制到capture目录,并且把汉字改成拼音
    :param srcfile:
    :return:
    '''
    fpath, fname = os.path.split(srcfile)
    filename_pre_ = pypinyin.slug(fname)
    dstfile = os.path.abspath(os.getcwd() + "/filepath/attachement/capture/" + filename_pre_)
    copy_file(srcfile, dstfile)
def resize_mage(filein):
    img = Image.open(filein)
    out = img.resize((129, 149), Image.ANTIALIAS)  # resize image with high-quality
    out.save(filein, 'png')
# 只能用于删除本地文件
def delete_many_file(file_list):
    for path in file_list:
        if os.path.exists(path):
            # 删除文件,可使用以下两种方法。
            try:
                os.remove(path)
            except Exception:
                ta_log.error(traceback.format_exc())
            # os.unlink(my_file)
def copy_and_zip(file_list, dst_folder_name):
    '''
    批量复制文件到指定文件夹,然后把指定文件夹的内容压缩成ZIP并且删掉该文件夹
    :param file_list: 文件或文件夹
    :param dst_folder_name: 目标压缩文件的名称
    :return:
    '''
    for item in file_list:
        copy_files_to_attachment(item, dst_folder_name)
    source = os.getcwd() + "/filepath/attachement/" + dst_folder_name
    shutil.make_archive(source, "zip", source)
    shutil.rmtree(source)
def copy_files_to_attachment(srcfile, filename):
    '''
    把文件或文件夹复制到指定目录中
    :param srcfile: 文件或者文件夹的绝对路径
    :param filename: 指定目录
    :return:
    '''
    dstfile = os.path.abspath(os.getcwd() + "/filepath/attachement/")
    folder_name = dstfile + "\\" + filename + "\\"
    if not os.path.isfile(srcfile):
        last_name = os.path.basename(srcfile)
        destination_name = folder_name + last_name
        shutil.copytree(srcfile, destination_name, symlinks=True)
    else:
        fpath, fname = os.path.split(folder_name)  # 分离文件名和路径
        if not os.path.exists(fpath):
            os.makedirs(fpath)  # 创建路径
        shutil.copy2(srcfile, folder_name)  # 移动文件
        print("copy %s -> %s" % (srcfile, folder_name))
def copy_file_to_folder(srcfile, folder_name):
    '''
       把文件或文件夹复制到指定目录中
       :param srcfile: 文件或者文件夹的绝对路径
       :param filename: 指定目录
       :return:
       '''
    if not os.path.isfile(srcfile):
        last_name = os.path.basename(srcfile)
        destination_name = folder_name + last_name
        shutil.copytree(srcfile, destination_name, symlinks=True)
    else:
        fpath, fname = os.path.split(folder_name)  # 分离文件名和路径
        if not os.path.exists(fpath):
            os.makedirs(fpath)  # 创建路径
        shutil.copy2(srcfile, folder_name)  # 移动文件
        print("copy %s -> %s" % (srcfile, folder_name))

python生成bat脚本,并且执行bat脚本

需求

关键词:python,sqllite,bat,进程
最近有一个需求,需要用python读取一个txt文件里的sql语句,用sqllite去执行,执行之前先备份一次sqllite数据库到临时目录。然后sql执行成功则删除临时文件,sql执行失败则使用备份的sqllite去覆盖掉当前错误的sqllite数据库。

关键点

  • 生成临时目录。使用python的tempfile库
  • 复制文件,使用python的shutil库
  • 从文本中读取sql,使用open方法:
  • f = open(file_name, 'r')
    sql = f.read()
  • 生成bat文件,也使用open方法。模式选择w
  • bat命令中的暂停使用@ping -n 5 127.1 >nul 2>nul
  • bat命令中的删除使用“del 路径”
  • bat命令中的复制使用“copy 源文件 目标文件”
  • bat命令执行完后,等待用户手动关闭使用pause,自动关闭使用exit

部分代码

# -*- coding: UTF-8 -*-
# 命名方式为表名_操作_字段
import os
import tempfile
import connectDB
from controller import fileController
curosrdiction = connectDB.curosr_diction
database_back_folder = ""
def exec_sql(sqltext):
    '''
    执行sql,成功返回true
    :param sqltext: sql语句
    :return:
    '''
    result = curosrdiction.executescript(sqltext)
    curosrdiction.commit()
    return result
def backup_database():
    '''
    备份数据库
    :return:
    '''
    global database_back_folder
    database_back_folder = tempfile.mkdtemp()
    fileController.copy_file_to_folder('resource/sqllite.db', database_back_folder)
    write_bat()
def restore_database():
    '''
    恢复数据库
    :return:
    '''
    curosrdiction.close()
    fileController.copy_file(srcfile=database_back_folder + "\\sqllite.db", dstfile='resource/sqllite.db')
def read_sql_from_text(file_name):
    '''
    从文本文件中读取sql并且执行,执行失败,就把原来的数据库覆盖回来
    :param file_name:
    :return:
    '''
    f = open(file_name, 'r')
    sql = f.read()
    if not exec_sql(sql):
        run_bat()
def write_bat():
    sql_database_path = os.getcwd() + "\\resource\\sqllite.db"
    sql_database_path_shm = os.getcwd() + "\\resource\\sqllite.db-shm"
    sql_database_path_wal = os.getcwd() + "\\resource\\sqllite.db-wal"
    bat_name = 'copy.bat'
    s1 = '''@echo off
@ping -n 5 127.1 >nul 2>nul
echo delete database...
del ''' + sql_database_path + '''
del ''' + sql_database_path_shm + '''
del ''' + sql_database_path_wal + '''
echo restore database...
@ping -n 5 127.1 >nul 2>nul
copy ''' + database_back_folder + "\\sqllite.db " + sql_database_path + '''
echo restore finish
pause'''
    f = open(bat_name, 'w')
    f.write(s1)
    f.close()
def run_bat():
    '''
    运行bat
    :return:
    '''
    os.system('start copy.bat')
def update_sql():
    '''
    开始更新数据库的主入口
    :return:
    '''
    backup_database()
    read_sql_from_text("resource/download/1.2.3_sql.txt")

wxpython 从剪贴板读取文件,读取文字,读取图像

需求

前段时间有这样一个需求,要读取用户的剪贴板的内容,然后把剪贴板的信息复制到另一个地方。例如:

  • 当用户复制的是图片时,把图片复制到一个指定位置。
  • 当用户复制的是txt中的一段文字时,获得复制的文字内容。
  • 当用户复制的是一个文件时,获得复制的文件名和路径,然后复制到一个指定位置。

设计

1.通过wx自带的检查剪贴板功能。
wx.TheClipboard.IsSupported(wx.DataFormat(wx.DF_BITMAP)) 这样的方式,判断是不是图片。
文字和文件的方法类似,可以在下面的代码里看到。
2.通过wx.TheClipboard.GetData(file_obj)的方法获得剪贴板的内容
3.如果是图片,通过wx.BitmapDataObject()的GetBitmap()方法获得图片信息。再通过SaveFile(name=filename, type=wx.BITMAP_TYPE_BMP)方法保存图片
4.如果是文字。通过wx.TextDataObject()的GetText()方法获得文字内容。
5.如果是文件。通过wx.FileDataObject()的GetFilenames()获得复制的文件列表。然后可以通过shutil库的copy2方法复制文件到指定位置

wxpython窗体的部分代码

#!/usr/bin/env python
import wx
class MyFrame(wx.Frame):
    def __init__(self, parent):
        wx.Frame.__init__(self, parent, title="Paste Button Demo")
        self.text = wx.TextCtrl(self, style=wx.TE_MULTILINE | wx.HSCROLL)
        self.count = 4  # gets incremented
        menu = wx.MenuBar()
        edit = wx.Menu()
        paste = edit.Append(wx.ID_PASTE, "&Paste", "Paste from the clipboard")
        menu.Append(edit, "&Edit")
        self.SetMenuBar(menu)
        self.toolbar = self.CreateToolBar()
        bmp = wx.ArtProvider.GetBitmap(wx.ART_PASTE, wx.ART_TOOLBAR)
        self.toolbar.AddTool(wx.ID_PASTE,"1234",bmp)
        self.toolbar.Realize()
        self.Bind(wx.EVT_IDLE, self.update_ui)
        self.Bind(wx.EVT_UPDATE_UI, self.update_ui, id=wx.ID_PASTE)
        wx.UpdateUIEvent.SetUpdateInterval(75)
        self.UpdateWindowUI()
    def update_ui(self, event):
        if event.GetId() == wx.ID_PASTE:  # check this less frequently, possibly expensive
            self.count += 1
            if self.count < 5:
                return
            if not wx.TheClipboard.IsOpened():
                self.count = 0
                wx.TheClipboard.Open()
                success = wx.TheClipboard.IsSupported(wx.DataFormat(wx.DF_BITMAP))
                success2 = wx.TheClipboard.IsSupported(wx.DataFormat(wx.DF_TEXT))
                success3 = wx.TheClipboard.IsSupported(wx.DataFormat(wx.DF_ENHMETAFILE))
                success4 = wx.TheClipboard.IsSupported(wx.DataFormat(wx.DF_FILENAME))
                success5 = wx.TheClipboard.IsSupported(wx.DataFormat(wx.DF_LOCALE))
                print("success"+str(success))
                print("success2"+str(success2))
                print("success3" + str(success3))
                print("success4" + str(success4))
                print("success5" + str(success5))
                if success2:
                    text_obj = wx.TextDataObject()
                    if wx.TheClipboard.IsOpened() or wx.TheClipboard.Open():
                        if wx.TheClipboard.GetData(text_obj):
                            value = text_obj.GetText()
                        wx.TheClipboard.Close()
                    self.text.SetValue(value)
                elif success4:
                    file_obj = wx.FileDataObject()
                    if wx.TheClipboard.IsOpened() or wx.TheClipboard.Open():
                        if wx.TheClipboard.GetData(file_obj):
                            value = file_obj.GetFilenames()
                            print(value[0])
                        wx.TheClipboard.Close()
                    self.text.SetValue(value[0])
                else:
                    event.Enable(False)
                    self.text.SetValue("You can't paste. :(")
app = wx.App(False)
f = MyFrame(None)
f.Show()
app.MainLoop()

奇葩需求之-不使用python的第三方库做一个简单的网页

需求

最近接了一个特别奇葩的需求,要求在最基础的python2.7的docker环境上运行一个小型网站,不能使用额外的第三方库。因为之前都用flask,忽然不用库,有一点懵逼。然后找了一圈,发现用python自带的httpserver似乎可以完成这样的任务。代码如下:

代码

index.html

<!DOCTYPE html PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN" "http://www.w3.org/TR/html4/loose.dtd">
<html>
<head>
<meta http-equiv="Content-Type" content="text/html; charset=ISO-8859-1">
<title>Insert title here</title>
</head>
<body>
	<form action="/signin" method="post">
		phone:<input type="text" name="phone"><br>
		vscode:<input type="password" name="vscode"><br>
		<input type="submit" value="login">
	</form>
</body>
</html>

test.py

# -*- coding:utf-8 -*-
# author: lichmama
# email: nextgodhand@163.com
# filename: httpd.py
import io
import os
import sys
import urllib
from BaseHTTPServer import HTTPServer
from SimpleHTTPServer import SimpleHTTPRequestHandler
import urllib2
import json
class MyRequestHandler(SimpleHTTPRequestHandler):
    protocol_version = "HTTP/1.1"
    server_version = "PSHS/0.1"
    sys_version = "Python/2.7.x"
    def get_evns(self):
        evns = os.environ
        return ' \\n '.join([k+"="+v for k,v in evns.items()])
    def do_GET(self):
        if self.path == "/" or self.path == "/index":
            content = open("index.html", "rb").read()
            self.send_head(content)
        elif self.path == "/evns":
            content = self.get_evns()
            self.send_head(content)
        else:
            path = self.translate_path(self.path)
            system=sys.platform
            osv=os.environ
            if osv.has_key('HOMEPATH'):
                content = os.environ['HOMEPATH']
            else:
                content = os.environ['HOME']
            self.send_head(content)
            if os.path.exists(path):
                extn = os.path.splitext(path)[1].lower()
                content = open(path, "rb").read()
                self.send_head(content, type=self.extensions_map[extn])
            else:
                content = open("404.html", "rb").read()
                self.send_head(content, code=404)
        self.send_content(content)
    def do_POST(self):
        if self.path == "/signin":
            data = self.rfile.read(int(self.headers["content-length"]))
            data = urllib.unquote(data)
            data = self.parse_data(data)
            test_data = {"username": "zhangyong_new","userpasswd": "123456"}
            requrl = "https://dev.honcloud.honeywell.com.cn/dashboard/usercentre/login"
            header = {"Content-type": "application/json"}
            test_data_urlencode=json.dumps(test_data)
            req = urllib2.Request(url=requrl, data=test_data_urlencode,headers=header)
            res_data = urllib2.urlopen(req)
            res = res_data.read()
            print res
            self.send_head(res)
            self.send_content(res)
    def parse_data(self, data):
        ranges = {}
        for item in data.split("&"):
            k, v = item.split("=")
            ranges[k] = v
        return ranges
    def send_head(self, content, code=200, type="text/html"):
        self.send_response(code)
        self.send_header("Content-Type", type)
        self.send_header("Content-Length", str(len(content)))
        self.end_headers()
    def send_content(self, content):
        f = io.BytesIO()
        f.write(content)
        f.seek(0)
        self.copyfile(f, self.wfile)
        f.close() if __name__ == "__main__":
    if len(sys.argv) == 2:
        # set the target where to mkdir, and default "D:/web"
        MyRequestHandler.target = sys.argv[1]
    try:
        server = HTTPServer(("0.0.0.0", 8282), MyRequestHandler)
        print "pythonic-simple-http-server started, serving at http://%s:%d"%(server.server_address[0],server.server_address[1])
        print server.server_address
        server.serve_forever()
    except KeyboardInterrupt:
        server.socket.close()

通过Queue解决sqllite多线程报错的问题(实现多线程增删改查,以字典形式查询结果)

需求:

小程序后台用的sqllite数据库,刚开始用的时候,没有考虑多线程,而且当时因为数据量少,没有出现过多线程查询报错,现在数据量大了。多线程查询经常报错

ProgrammingError: Recursive use of cursors not allowed.

就是这个头疼的错。在网上查了大量的资料,要么就是加lock=threading.lock(),要么就是加sleep.终究还是解决不了问题。
刚好最近在网上看了一个小哥哥用Queue来解决这个问题。我改进了一下。目前能够使用该方法进行增删改查。查询出来的结果以字典的形式返回。
话不多说,下面上代码

代码

# -*- coding: UTF-8 -*-
import sqlite3
import time
from Queue import Queue
from threading import Thread
def sqllite_escape(key_word):
    key_word = key_word.encode("utf-8")
    key_word = key_word.replace("'", "''")
    return key_word
class SelectConnect(object):
    '''
    只能用来查询
    '''
    def __init__(self):
        # isolation_level=None为智能提交模式,不需要commit
        self.conn = sqlite3.connect('resource/data.ta', check_same_thread=False, isolation_level=None)
        self.conn.execute('PRAGMA journal_mode = WAL')
        cursor = self.conn.cursor()
        cursor.execute('PRAGMA synchronous=OFF')
        self.conn.text_factory = str
        # 把结果用元祖的形式取出来
        self.curosr = self.conn.cursor()
        self.conn.row_factory = self.dict_factory
        # 把结果用字典的形式取出来
        self.curosr_diction = self.conn.cursor()
    def commit(self):
        self.conn.commit()
    def dict_factory(self, cursor, row):
        d = {}
        for idx, col in enumerate(cursor.description):
            d[col[0]] = row[idx]
        return d
    def close_db(self):
        # self.curosr.close()
        self.conn.close()
class SqliteMultithread(Thread):
    """
    Wrap sqlite connection in a way that allows concurrent requests from multiple threads.
    This is done by internally queueing the requests and processing them sequentially
    in a separate thread (in the same order they arrived).
    """
    def __init__(self, filename, autocommit, journal_mode):
        super(SqliteMultithread, self).__init__()
        self.filename = filename
        self.autocommit = autocommit
        self.journal_mode = journal_mode
        self.reqs = Queue()  # use request queue of unlimited size
        self.setDaemon(True)  # python2.5-compatible
        self.running = True
        self.start()
    def dict_factory(self, cursor, row):
        # field = [i[0] for i in cursor.description]
        # value = [dict(zip(field, i)) for i in records]
        d = {}
        for idx, col in enumerate(cursor.description):
            d[col[0]] = row[idx]
        return d
    def run(self):
        if self.autocommit:
            conn = sqlite3.connect(self.filename, isolation_level=None, check_same_thread=False)
        else:
            conn = sqlite3.connect(self.filename, check_same_thread=False)
        conn.execute('PRAGMA journal_mode = %s' % self.journal_mode)
        conn.text_factory = str
        cursor = conn.cursor()
        cursor.execute('PRAGMA synchronous=OFF')
        conn.row_factory = self.dict_factory
        curosr_diction = conn.cursor()
        curosr_diction.execute('PRAGMA synchronous=OFF')
        # 把结果用字典的形式取出来
        while self.running:
            req, arg, res = self.reqs.get()
            if req == '--close--':
                break
            elif req == '--commit--':
                conn.commit()
            else:
                # print(arg)
                curosr_diction.execute(req, arg)
                # if res:
                #     for rec in cursor:
                #         res.put(rec)
                #     res.put('--no more--')
                if res:
                    res.put(curosr_diction.fetchall())
                if self.autocommit:
                    conn.commit()
        conn.close()
    def execute(self, req, arg=None, res=None):
        """
        `execute` calls are non-blocking: just queue up the request and return immediately.
        """
        self.reqs.put((req, arg or tuple(), res))
    def executemany(self, req, items):
        for item in items:
            self.execute(req, item)
    def select_all_dict(self, req, arg=None):
        '''
        直接返回一个list
        :param req:
        :param arg:
        :return:
        '''
        res = Queue()  # results of the select will appear as items in this queue
        self.execute(req, arg, res)
        rec = res.get()
        return rec
    def select_one_dict(self, req, arg=None):
        '''
        直接返回list里的第一个元素,并且以字典展示
        :param req:
        :param arg:
        :return:
        '''
        res = Queue()  # results of the select will appear as items in this queue
        self.execute(req, arg, res)
        rec = res.get()
        if len(rec) != 0:
            rec = rec[0]
        else:
            rec = None
        return rec
    def commit(self):
        self.execute('--commit--')
    def close(self):
        self.execute('--close--')
class Cursor(object):
    '''
    以元祖的形式查询出数据
    '''
    def __init__(self):
        old_con = SelectConnect()
        self.conn = old_con.conn
        self.curosr = old_con.curosr
        self.curosr2 = SqliteMultithread('resource/data.ta', autocommit=True, journal_mode="WAL")
    def execute(self, string, *args):
        try:
            if string.startswith('select'):
                return self.curosr.execute(string, *args)
            else:
                return self.curosr2.execute(string, *args)
        except Exception:
            print("失败一次")
            print(string)
            time.sleep(0.1)
            self.execute(string, *args)
    def executescript(self, string):
        try:
            self.curosr.executescript(string)
        except Exception:
            print("失败一次")
            print(string)
            time.sleep(0.1)
            self.executescript(string)
    def fetchall(self):
        return self.curosr.fetchall()
    def fetchone(self):
        return self.curosr.fetchone()
    def rowcount(self):
        return self.curosr.rowcount
    def close(self):
        self.curosr2.running = False
        self.curosr.close()
        self.conn.close()
class Curosrdiction(object):
    '''
    以字典的形式查询出数据,建议全部用这种。
    '''
    def __init__(self):
        old_con = SelectConnect()
        self.conn = old_con.conn
        self.curosrdiction = old_con.curosr_diction
        self.curosr2 = SqliteMultithread('resource/data.ta', autocommit=True, journal_mode="WAL")
    def execute(self, string, *args):
        try:
            if string.startswith('select'):
                return self.curosrdiction.execute(string, *args)
            else:
                return self.curosr2.execute(string, *args)
        except Exception:
            print("失败一次")
            print(string)
            time.sleep(0.1)
            self.execute(string, *args)
    def executescript(self, string):
        result = True
        try:
            self.curosrdiction.executescript(string)
        except Exception:
            print("失败一次")
            # print(string)
            time.sleep(0.1)
            # self.executescript(string)
            result = False
        return result
    def fetchall(self):
        return self.curosrdiction.fetchall()
    def fetchone(self):
        return self.curosrdiction.fetchone()
    def rowcount(self):
        return self.curosrdiction.rowcount
    def select_all_dict(self, string, *args):
        return self.curosr2.select_all_dict(string, *args)
    def select_one_dict(self, string, *args):
        return self.curosr2.select_one_dict(string, *args)
    def close(self):
        self.curosr2.running = False
        self.curosrdiction.close()
        self.conn.close()
    def commit(self):
        self.conn.commit()
        self.curosr2.commit()
# curosr = Cursor()
curosr_diction = Curosrdiction()
def commit():
    curosr_diction.commit()
def close_db():
    # curosr.close()
    curosr_diction.close()

jira-api使用gevent,快速批量修改数据

需求

因为我们本地数据库中经常会产生大量的数据,这些数据需要同步至jira.覆盖掉jira上的老数据。
因为jira服务器部署在国外,连接巨慢。每次修改一个,至少需要花费10秒左右的时间。
而jira-api没有提供批量或者异步同步的方法,所以需要自己使用一个异步的方式来实现一次性把所有的请求发送出去。
jira服务端有时候吃不消,会关闭掉我的一些连接,所以还需要在被关闭之后,记录下这些同步至jira失败的,重新继续修改操作

用到的库

gevent  库

gevent是一个基于libev的并发库。它为各种并发和网络相关的任务提供了整洁的API。

jira库。
里面有jira登录,修改,新增issue的方法

代码实现

# -*- coding: UTF-8 -*-
import datetime
import random
import gevent
from gevent import monkey
from jira import JIRA
from model import addDefectDAO
monkey.patch_all()
issue_list = []
def update(issue_key):
    # 产生一点小小的间隔,避免一瞬间发出去100多个请求
    gevent.sleep(random.randint(1, 5) * 0.001)
    global issue_list
    starttime = datetime.datetime.now()
    print(starttime)
    print("start:" + issue_key)
    print("start:" + issue_key)
    issue = jira.issue(issue_key)
    issue_dict = {
        'summary': issue_key
    }
    try:
        issue.update(fields=issue_dict)
        issue_list.remove(issue_key)
    except Exception:
        pass
    print("end:" + issue_key)
# 从一个本地数据库取出一堆issue的信息,602是本地的project信息编号
all = addDefectDAO.defect_select_not_draft_by_project_id(602)
for item in all:
    issue_list.append(item["issue_key"])
url = "https://www.jira.com"
username = "username"
password = "password"
jira = JIRA(server=url, basic_auth=(username, password), max_retries=0, timeout=400, async_=True, async_workers=5)
# jira2 = Jira(url=url, username=username, password=password)
print("登录成功")
starttime = datetime.datetime.now()
api_list = []
while issue_list != []:
    for i in issue_list:
        api_list.append(gevent.spawn(update, i))
    gevent.joinall(api_list)
endtime = datetime.datetime.now()
print('总耗时')
print((endtime - starttime).seconds)
print('*******')

注意事项

1.需要引用monkey.patch_all()
2.引用monkey.patch_all()之后会有错误信息:

UserWarning: libuv only supports millisecond timer resolution; all times less will be set to 1 ms

这个是已知bug,在11月29日有人在github上已经提过了,作者会在之后的版本修改。目前不影响使用。请忽略
3.通过这样的方法修改之后,经过我的测试,速度大概快了5倍以上

通过python-docx给word文档中的指定位置添加表格

需求

1.读取一个已有的word文档。docx格式。
2.在该word文档中,通过一个给定的文字。找到该位置。在该位置的下方添加一个表格。例如在图中“BUG情况表”的下方插入一个表格

3.表格内容如下。要求添加完该表格后,如果表格内容发生变更。还能再次通过该程序,修改表格里的数据。

设计

通过python-docx读取word文档。通过document.paragraphs定位指定文字的位置。
通过xlwings读取excel的内容,存成list[list[]]。
通过docx的add_table增加一个表格,并且更改表头颜色,合并表格等操作
通过识别表头的第一行,判断是否是已经存在这个表格,来决定是否要删除原表格

代码

# -*- coding: UTF-8 -*-
import sys
from copy import deepcopy
import xlwings
from docx import Document
from docx.oxml.ns import nsdecls
from docx.oxml import parse_xml
def copy_table_after(table, paragraph):
    tbl, p = table._tbl, paragraph._p
    new_tbl = deepcopy(tbl)
    p.addnext(new_tbl)
def move_table_after(table, paragraph):
    tbl, p = table._tbl, paragraph._p
    p.addnext(tbl)
def get_excel_date(filename):
    '''
    获得excel里的所有内容,返回list
    :param filename:  excel路径
    :return: list[list[]]
    '''
    app = xlwings.App(visible=False, add_book=True)
    app.display_alerts = False
    app.screen_updating = False
    wb = app.books.open(filename)
    sht = wb.sheets[0]
    rng = sht.range('A1')
    # 把excel里的数据读取成 年-月-日 时:分:秒的格式
    my_date_handler = lambda year, month, day, hour, minute, second, **kwargs: "%04i-%02i-%02i %02i:%02i:%02i" % (
        year, month, day, hour, minute, second)
    # 取出所有内容,这里用ig这个变量,是为了庆祝I.G获得LOL S8赛季总冠军
    ig = rng.current_region.options(index=False, numbers=int, empty='N/A', dates=my_date_handler)
    result = ig.value
    wb.close()
    app.quit()
    return result
def delete_table_with_title(document,expect_text):
    allTables = document.tables
    for activeTable in allTables:
        if activeTable.cell(0, 0).paragraphs[0].text == expect_text:
            print('删除成功')
            activeTable._element.getparent().remove(activeTable._element)
def insert_table_after_text(file_name,excel_name,expect_text):
    document = Document(file_name)
    # 因为docx读出来的都是unicode类型的,所以我们要用unicode类型的进行查找
    expect_text=expect_text.decode('utf-8')
    delete_table_with_title(document,expect_text)
    target = None
    for paragraph in document.paragraphs:
        paragraph_text = paragraph.text
        if paragraph_text.endswith(expect_text):
            target = paragraph
            break
    if target is not None:
        records = get_excel_date(excel_name)
        # 获得excel数据的栏数,初始化一个空的table
        col = len(records[0])
        table = document.add_table(rows=1, cols=col)
        table.style = 'Table Grid'
        # 给table加一个表头,并且合并第一栏
        shading_elm_1 = parse_xml(r'<w:shd {} w:fill="D9E2F3"/>'.format(nsdecls('w')))
        table.rows[0].cells[0]._tc.get_or_add_tcPr().append(shading_elm_1)
        table.rows[0].cells[0].text=expect_text
        table_row=table.rows[0]
        first=table_row.cells[0]
        end=table_row.cells[-1]
        first.merge(end)
        # 合并结束,开始把excel里的内容添加到table里
        for tr_list in records:
            row_cells = table.add_row().cells
            index = 0
            for td_list in tr_list:
                row_cells[index].text = td_list
                index = index + 1
        # 把添加的table移动到指定的位置
        move_table_after(table, target)
        # 保存
    document.save(file_name)
if __name__ == '__main__':
    insert_table_after_text('demo2.docx', 'demo.xlsx',"BUG情况表")

最终效果

API测试:通过faker生成测试数据,通过schema检查返回结果

需求

假定有如主图相同的http请求。我们一般的做法是,用postman去抓取http请求,然后修改request的body或者header里的数据,点击send按钮,检查返回的response的body是否正确。
对于输入。一般来说,我们会纯手工,或者半自动的,设计测试用例。例如使用边界值分析,等价类划分等方法,用在我们的输入参数中。比如我参数中的configname最多200个参数,我测试输入201个参数。
对于输出。一般来说,我们大部分时候是肉眼检查,或者写代码,通过jsonpath取参数,然后判断是否存在来检查。
这里我打算用一个新的方法来降低测试的手工特性,让他更自动化一点。以下想法还处于调试阶段,用于大规模使用,暂时不行。

设计

输入修改方案:引入faker库和jsonschema库。通过这两个库,我们可以产生随机的json串
faker是我无意之间发现的,能按照规律产生随机字的库,例如

fake.name()

是产生一个随机的名字,只要加入适当的providers,就能按照需要的规则产生随机字
jsonschema这个用的人很多,这里就不介绍了,下面推荐一个网站,能把json请求转换为schema格式
https://jsonschema.net/
schema中会注明每个字段的规则,例如是string类型还是integer。
输出修改方案:使用jsonschma的validate方法来检查(这种检查方法目前有一些检查不充分,但是已经可以让测试人员减少一些工作量了)

jsonschema.validate(response, schema)

使用方案

1.去postman抓取http请求,并且记录下所需要的输入json和输出json

2.打开https://jsonschema.net/ 把输入json和输入json 转换成jsonschema

3.把输入jsonschema文件,输出文件jsonschema放入相应的目录,自己写一个用于生成随机requestbody的provider和一个测试用的主函数

4.运行测试主入口文件,打印一下发送的json文件,看是不是随机化了,结果是确实随机化了。

代码

测试主入口test_json_from_schema.py

#!/usr/bin/env python
# -*- coding: UTF-8 -*-
import json
import faker
import jsonschema
import requests
from jsonschema.exceptions import ValidationError
import jsonprovider
def generate_request(request_json_schema):
    '''
    通过schema生成随机测试数据
    :param request_json_schema:
    :return:
    '''
    fake = faker.Faker()
    fake.add_provider(jsonprovider.JSONProvider)
    request_body = fake.json(json.load(open(request_json_schema)))
    print(request_body)
    return request_body
def check_json_schema(response, schema):
    '''
    通过json_schema检查返回的json串
    :param response:
    :param schema:
    :return:
    '''
    result = True
    try:
        jsonschema.validate(response, schema)
    except ValidationError, e:
        print("fail")
        result = False
    return result
if __name__ == '__main__':
    # 生成request body
    body = generate_request("schema_file/create_config_request_schemas.json")
    # 使用request库发送post请求
    url = "https://dev.honcloud.honeywell.com.cn:8080/dashboard/clustercentre/configmng/newconfig/addconfig"
    headers = {"Content-Type": "application/json", "authorization": "48a5eb61-914e-4b3a-a7a3-0b25f72d06d7"}
    response = requests.post(url, data=body, headers=headers)
    print(response.json())
    response_json=response.json()
    response_schema="schema_file/create_config_response_schemas.json"
    # 用生成的response的schema来检查
    result=check_json_schema(response_json,response_schema)
    print(result)

jsonprovider.py可以自行百度一个faker的provider的方案,我这里做的也不好,随机出来的值只遵循了字符类型,后面会考虑融合我们的边界值分析,等价类划分的方案进来,完善这个jsonprovider.py之后再放出来

python读取excel内容再转变成html添加到outlook中

需求

读取excel里的表格里的内容,然后打开本机的outlook。把excel里的内容添加到正文里,注意。这里是要添加到正文!正文!正文!而不是添加到附件里

设计思路

1.excel处理

打开excel的方法有很多,但是在不知道excel里,行和列的大小的情况下,就能获得excel里的非空值行列的办法不多。我这边采用的是xlwings这个库,用的方法是range.current_region这个方法。这个方法会选择当前range下,有值的区域(非空区域)
通过配置options选项,可以指定excel获得的值的格式,int或者string,或者空值返回N/A

2.打开outlook

打开outlook在windows上只能用win32模块了,通过下面方法可以打开outlook并且创建一个空的email

olook = win32com.client.Dispatch("Outlook.Application")
mail = olook.CreateItem(0)

然后配置邮件的htmlbody和outlook邮件悬停(可以手动更改邮件内容,手动发送),可以用以下方法

mail.HTMLBody = body_html
mail.Display(True)

完整代码

下面是完整代码,读者修改一下excel的路径,就可以自己去试试啦

# -*- coding: UTF-8 -*-
import sys
reload(sys)
sys.setdefaultencoding('utf-8')
import win32com.client
import xlwings
def get_excel_date(filename):
    '''
    获得excel里的所有内容,返回list
    :param filename:  excel路径
    :return: list[list[]]
    '''
    app = xlwings.App(visible=False, add_book=True)
    app.display_alerts = False
    app.screen_updating = False
    wb = app.books.open(filename)
    sht = wb.sheets[0]
    rng = sht.range('A1')
    # 把excel里的数据读取成 年-月-日 时:分:秒的格式
    my_date_handler = lambda year, month, day, hour, minute, second, **kwargs: "%04i-%02i-%02i %02i:%02i:%02i" % (
    year, month, day, hour, minute, second)
    # 取出所有内容,这里用ig这个变量,是为了庆祝I.G获得LOL S8赛季总冠军
    ig = rng.current_region.options(index=False, numbers=int, empty='N/A', dates=my_date_handler)
    result = ig.value
    wb.close()
    app.quit()
    return result
if __name__ == '__main__':
    olook = win32com.client.Dispatch("Outlook.Application")
    mail = olook.CreateItem(0)
    mail.Recipients.Add("357244849@qq.com")
    mail.Subject = "test report"
    body_html = ""
    body_html = body_html + '<body>Hi all:<br/>以下是XXXXX项目今天的测试情况:<br/><br/>明天的测试计划:<br/><br/>目前的bug:'
    body_html = body_html + '<table width="1" border="1" cellspacing="1" cellpadding="1" height="100">'
    # 这里用rng 是因为这一次rng止步8强!
    rng_list = get_excel_date("C:\lzw_programming\resource\reports\CurrentVersionAllDefectTable.xlsx")
    # 表头
    for tr_list in rng_list[:1]:
        body_html = body_html + "<tr>"
        for td_list in tr_list:
            # 这里也是奇葩需求,因为要求表头不能换行,所以用了nowrap
            body_html = body_html + '<th bgcolor="#C3C3C3" nowrap="nowrap">' + td_list + '</th>'
        body_html = body_html + "</tr>"
    # 表内容
    for tr_list in rng_list[1:]:
        body_html = body_html + "<tr>"
        for td_list in tr_list:
            body_html = body_html + "<td>" + td_list + "</td>"
        body_html = body_html + "</tr>"
    body_html = body_html + '</table>'
    body_html = body_html + "</body>"
    mail.HTMLBody = body_html
    mail.Display(True)

苏ICP备18047533号-2