Category Archive : python相关

生成oracle客户端docker镜像的两种玩法

背景

我们的oracle服务端是oracle12g版本,应用程序均由golang或者python编写,运行在k8s 容器里,那我们就需要build一些docker容器来,那如何build呢?可以参考以下我的方法。文章最后有我编写过程中的参考文章,也可以根据参考文章自己创新。

玩法1:参考oracle官方文档制作

1.下载代码https://github.com/oracle/docker-images.git 到本地

2.进入OracleInstantClient/oraclelinux8/21/目录,该目录下有一个原始的dockerfile文件,可以使用该文件build一个基础镜像,例如

docker build --pull -t oracle/instantclient:21 .

使用build出来的这个oracle/instantclient:21镜像可以二次进行dockerfile编辑加入golang或者python。

也可以用这个oracle/instantclient:21来测试一下oracle数据库是否能正常连接。测试连接命令如下:

docker run -ti --rm oracle/instantclient:21 sqlplus 用户名/密码@数据库IP:数据库端口/数据库名

玩法2:从debian开始制作一个镜像

除了上面的方法外,我们还可以从debian开始制作一个包含python的镜像

1.进入oracle客户端下载页https://www.oracle.com/database/technologies/instant-client/linux-x86-64-downloads.html

2.下载https://download.oracle.com/otn_software/linux/instantclient/218000/instantclient-basic-linux.x64-21.8.0.0.0dbru.zip到本地,如下图

3.编写Dockerfile,以debian+oracle+python举例

FROM debian:11-slim

LABEL maintainer="zhenwei.li <zhenwei.li@sfere-elec.com>"
RUN set -eux \
    && sed -i "s@http://ftp.debian.org@https://repo.huaweicloud.com@g" /etc/apt/sources.list \
    && sed -i "s@http://security.debian.org@https://repo.huaweicloud.com@g" /etc/apt/sources.list \
    && apt-get update \
    && apt-get install -y -q libaio1 unzip python3 pip
    && pip install cx_Oracle

# 清理垃圾
RUN set -eux \
    && apt-get clean \
    && rm -rf /var/lib/apt/lists/* \
    && rm -rf /tmp/*
ENV TZ=Asia/Shanghai \
    DEBIAN_FRONTEND=noninteractive

RUN ln -fs /usr/share/zoneinfo/${TZ} /etc/localtime \
    && echo ${TZ} > /etc/timezone \
    && dpkg-reconfigure --frontend noninteractive tzdata \
    && rm -rf /var/lib/apt/lists/*

COPY instantclient-basic-linux.x64-21.8.0.0.0dbru.zip /opt/oracle/instantclient-basic-linux.x64-21.8.0.0.0dbru.zip

WORKDIR /opt/oracle/

RUN unzip instantclient-basic-linux.x64-21.8.0.0.0dbru.zip

RUN sh -c "echo /opt/oracle/instantclient_21_8 > /etc/ld.so.conf.d/oracle-instantclient.conf"

RUN ldconfig

RUN useradd sfere

4. 目录下放Dockerfile和oracle客户端zip包

5. 制作镜像

docker build -t debian-oracle .

6.运行镜像,测试python连接oracle服务端可行,依次输入如下命令

docker run -ti --rm debian-oracle python
import cx_Oracle as cx
con = cx.connect('用户名', '密码', '数据库IP:数据库端口/数据库名')

参考文章

https://github.com/oracle/docker-images/tree/main/OracleInstantClient

https://csiandal.medium.com/install-oracle-instant-client-on-ubuntu-4ffc8fdfda08

playwright拖拽元素和获取元素集合

背景

因为这两个功能经常用,网上又很少有直接可以抄的代码,所以我写个文章记录一下

元素向下拖拽100个px

拖拽演示

解说:先定位到要拖拽的元素,然后鼠标移动到元素的中心点,接着鼠标选中,鼠标移动,鼠标放下

src_elem = page.locator("xpath=元素定位")
box = src_elem.bounding_box(timeout=60000)
page.mouse.move(box["x"] + box["width"] / 2, box["y"] + box["height"] / 2)
page.mouse.down()
page.mouse.move(box["x"] + box["width"] / 2, box["y"] + box["height"] / 2 + 100)
page.mouse.up()

元素拖拽到另外一个元素上

这种方法也有用,比如把一个按钮丢到另外一个画板里

src_elem = page.locator("xpath=元素原来的")
src_elem.drag_to(page.locator("xpath=元素新的位置"))

获取一批元素集合,然后截图

我们的页面上有一批图片,以瀑布流的方式展示出来,他们的css都是相同的,我们需要给每一个元素截图,并且截图之后再鼠标向下滚动一下

elements = page.query_selector_all(".search-result__item")
number = 1
for item in elements:
    number = number + 1
    page.mouse.wheel(0, 100)
    item.screenshot(path="image/"+str(number) + ".png")
    time.sleep(1)

python通过sdk从minio下载文件时添加进度条

背景

Minio是就地环境下比较好用的对象存储工具,适合在CI/CD流程中使用。主要是因为GIT里用LFS来放大文件不妥,把部署流程中需要的中间文件放minio上,通过SDK去存取文件非常方便。

Minio的上传文件fput_object有progress参数,但是下载文件fget_object默认没有 progress 参数,所以我们需要自己用get_object对代码稍加改造

涉及到的库

https://github.com/verigak/progress

用于提供进度条

pip install progress
pip install minio

代码

from minio import Minio
from progress.bar import Bar


def get_object_with_progress(client, bucket_name, object_name):
    try:
        data = client.get_object(bucket_name, object_name)
        total_length = int(data.headers.get('content-length'))
        bar = Bar(object_name, max=total_length / 1024 / 1024, fill='*', check_tty=False,
                  suffix='%(percent).1f%% - %(eta_td)s')
        with open('./' + object_name, 'wb') as file_data:
            for d in data.stream(1024 * 1024):
                bar.next(1)
                file_data.write(d)
        bar.finish()
    except Exception as err:
        print(err)


if __name__ == '__main__':
    client = Minio(
        "play.min.io",
        access_key="Q3AM3UQ867SPQQA43P2F",
        secret_key="zuf+tfteSlswRu7BJ86wekitnifILbZam1KYY3TG",
    )

    bucket_name = "downlaod"
    object_name = "eiop-timescaledb-offline.zip"
    get_object_with_progress(client, bucket_name, object_name)

实现效果

electron+droneCI+minio流水线

背景

因为我们的electron程序已经开发完成,期望要能开发人员每次上传代码,打了tag就自动build一份deb文件,自动上传到minio,方便运维人员去拿deb文件部署到ubuntu环境上。我们已有的技术栈包含droneCI,minio,python,于是边有了该方案。本文省略了vault,ldap,minio,harbor的安装与配置,这些程序的安装配置在本网站的其他文章里,就不一一贴出来了


架构图

解释:

1.前端开发上传electron代码到git服务端

2.git服务端通过webhook方式通知drone-server产生了。例如本文只测试的是发布tag触发webhook,还有很多种触发方式都可以设置

3.drone-server收到通知后,再在drone-runner所在的k8s集群里启动一个包含nodejs和python的任务容器

4.任务容器通过electron-forge make 命令打包一个deb文件

5.任务容器通过minio提供的python sdk上传deb文件到minio


drone插件编写

要完成上述目标,第一步就是得编写一个drone的插件

我编写该插件使用的是nodejs16版本的debian系统,然后通过提前安装好需要的如下表格里的工具。注意,因为我用的是华为源,2021年12月9日的时候,华为镜像上最新的electron只到16.0.2版本,所以注意指定版本号

介绍:该插件使用nodejs16版本的debian系统,然后通过提前安装好需要的如下工具。注意,因为我用的是华为源,2021年12月9日的时候,华为镜像上最新的electron只到16.0.2版本,所以注意指定版本号

工具名
rpm
python3-pip
python3
fakeroot
electron@v16.0.2 
electron-prebuilt-compile
electron-forge 
dpkg
minio的python sdk

代码有3个文件main.py Dockerfile ,requirements.txt,下面是详细介绍

main.py

代码功能是先获取环境变量,然后使用git的tag号替换掉package.json里的version字段。执行yarn install,yanr make,通过环境变量找到需要上传的文件,通过pythonde的sdk上传到minio里。详细代码如下

#main.py
import json
import os
import subprocess

from minio import Minio
from minio.error import S3Error

endpoint = "minio.sfere.local"
access_key = "bababa"
secret_key = "bababa"
bucket = "electronjs"
folder_path = "/drone/src/out/make/deb/x64"
suffix = "deb"
tag = "0.0.0"


def find_file_by_suffix(target_dir, target_suffix="deb"):
    find_res = []
    target_suffix_dot = "." + target_suffix
    walk_generator = os.walk(target_dir)
    for root_path, dirs, files in walk_generator:
        if len(files) < 1:
            continue
        for file in files:
            file_name, suffix_name = os.path.splitext(file)
            if suffix_name == target_suffix_dot:
                find_res.append(os.path.join(root_path, file))
    return find_res


def get_environment():
    global endpoint, access_key, secret_key, bucket, suffix, tag

    if "PLUGIN_ENDPOINT" in os.environ:
        endpoint = os.environ["PLUGIN_ENDPOINT"]
    if "PLUGIN_ACCESS_KEY" in os.environ:
        access_key = os.environ["PLUGIN_ACCESS_KEY"]
    if "PLUGIN_SECRET_KEY" in os.environ:
        secret_key = os.environ["PLUGIN_SECRET_KEY"]
    if "PLUGIN_BUCKET" in os.environ:
        bucket = os.environ["PLUGIN_BUCKET"]
    if "PLUGIN_SUFFIX" in os.environ:
        suffix = os.environ["PLUGIN_SUFFIX"]
    if "PLUGIN_TAG" in os.environ:
        tag = os.environ["PLUGIN_TAG"]


def yarn_make():
    with open('./package.json', 'r', encoding='utf8')as fp:
        json_data = json.load(fp)
    json_data['version'] = tag
    with open('./package.json', 'w', encoding='utf8')as fp:
        json.dump(json_data, fp, ensure_ascii=False, indent=2)
    print('package version replace to ' + tag)
    print(subprocess.run("yarn install", shell=True))
    print(subprocess.run("yarn make", shell=True))


def upload_file():
    file_list = find_file_by_suffix(folder_path, suffix)
    # 创建minio连接,这里因为我们是http的,所以secure=False
    client = Minio(
        endpoint=endpoint,
        access_key=access_key,
        secure=False,
        secret_key=secret_key,
    )

    # 检查bucket是否存在,不存在就创建bucket
    found = client.bucket_exists(bucket)
    if not found:
        client.make_bucket(bucket)
    else:
        print("Bucket 'electronjs' already exists")

    # 上传文件到bucket里
    for file in file_list:
        name = os.path.basename(file)
        client.fput_object(
            bucket, name, file,
        )
        print(
            "'" + file + "' is successfully uploaded as "
                         "object '" + name + "' to bucket '" + bucket + "'."
        )


if __name__ == "__main__":
    get_environment()
    yarn_make()
    try:
        upload_file()
    except S3Error as exc:
        print("error occurred.", exc)

Dockerfile

取一个node16版本的debian系统,使用国内源安装我们在之前列出来要用的工具,然后指定程序入口是我们的python程序。编写完后,使用docker build -t drone-electron-minio-plugin:0.1.0 . 做一个镜像上传到私仓里

FROM node:16-buster
RUN npm config set registry https://mirrors.huaweicloud.com/repository/npm/ \
    && npm config set disturl https://mirrors.huaweicloud.com/nodejs \
    && npm config set sass_binary_site https://mirrors.huaweicloud.com/node-sass \
    && npm config set phantomjs_cdnurl https://mirrors.huaweicloud.com/phantomjs \
    && npm config set chromedriver_cdnurl https://mirrors.huaweicloud.com/chromedriver \
    && npm config set operadriver_cdnurl https://mirrors.huaweicloud.com/operadriver \
    && npm config set electron_mirror https://mirrors.huaweicloud.com/electron/ \
    && npm config set python_mirror https://mirrors.huaweicloud.com/python \
    && npm config set canvas_binary_host_mirror https://npm.taobao.org/mirrors/node-canvas-prebuilt/ \
    && npm install -g npm@8.2.0 \
    && yarn config set registry https://mirrors.huaweicloud.com/repository/npm/ \
    && yarn config set disturl https://mirrors.huaweicloud.com/nodejs \
    && yarn config set sass_binary_site https://mirrors.huaweicloud.com/node-sass \
    && yarn config set phantomjs_cdnurl https://mirrors.huaweicloud.com/phantomjs \
    && yarn config set chromedriver_cdnurl https://mirrors.huaweicloud.com/chromedriver \
    && yarn config set operadriver_cdnurl https://mirrors.huaweicloud.com/operadriver \
    && yarn config set electron_mirror https://mirrors.huaweicloud.com/electron/ \
    && yarn config set python_mirror https://mirrors.huaweicloud.com/python \
    && yarn config set canvas_binary_host_mirror https://npm.taobao.org/mirrors/node-canvas-prebuilt/ \
    && yarn global add electron@v16.0.2 electron-forge electron-prebuilt-compile\
    && sed -i "s@http://ftp.debian.org@https://repo.huaweicloud.com@g" /etc/apt/sources.list \
    && sed -i "s@http://security.debian.org@https://repo.huaweicloud.com@g" /etc/apt/sources.list \
    && sed -i "s@http://deb.debian.org@https://repo.huaweicloud.com@g" /etc/apt/sources.list \
    && apt update \
    && apt install -y fakeroot dpkg rpm python3 python3-pip
ADD . .   
WORKDIR . 
RUN pip3 install -r ./requirements.txt -i https://pypi.tuna.tsinghua.edu.cn/simple
#CMD ["python3","/main.py"]
WORKDIR /drone/src
ENTRYPOINT ["python3", "/main.py"]

requirements.txt

minio==7.1.2

electron仓库代码

我们的electron仓库里要添加一个.drone.yml文件和对package.json稍微进行一些修改

package.json

.drone.yml

droneCI的流水线文件,使用了我们在上一节里build出来的drone插件镜像


流水线演示

需要人手动操作的

流水线自动操作的

用python实现helm template功能

背景

众所周知,helm template 包名 -f values.yaml >输出文件。这个方式能渲染go-template,自动填充{{ .Values.XXX }}参数到文件里。现在有一个需求,需要用python来实现类似的功能。那么就来看看我的最后实现吧


sapmle.tmpl 待填充文件

{{ .Values.Count }} items are made of {{ .Values.Material }}
{{ .Values.Material }} items are made of {{ .Values.Material }}
{{ .Values.Material }} items are made of {{ .Values.Count }}
{{ .Values.mqtt.server }} dadasdjsaijaid

values.yml 参数文件

Count: 14
Material: Wool
mqtt:
  server: 172.15.62.2

python 代码

import re

from ruamel import yaml


def traverse(dic, path=None):
    if not path:
        path = []
    if isinstance(dic, dict):
        for x in dic.keys():
            local_path = path[:]
            local_path.append(x)
            for b in traverse(dic[x], local_path):
                yield b
    else:
        yield path, dic


def template_render(source_file, values_file, dest_file):
    with open(source_file, 'r') as source:
        origin = source.read()

    with open(values_file, 'r', encoding='utf-8') as vaules:
        result = yaml.load_all(vaules.read(), Loader=yaml.Loader)
        yaml_dict = list(result)[0]
    for x in traverse(yaml_dict):
        match = "\{\{ \.Values." + '.'.join(x[0]) + " \}?\}"
        origin = re.sub(match, str(x[1]), origin)

    with open(dest_file, 'w+') as dest:
        dest.write(origin)


if __name__ == '__main__':
    template_render('sample.tmpl', "values.yml","result.yaml")

result.yaml 渲染结果

14 items are made of Wool
Wool items are made of Wool
Wool items are made of 14
172.15.62.2 dadasdjsaijaid

自定义一个kaniko镜像

背景

kaniko是一款方便我们从K8S内部构建docker容器的工具,以前我们在CI过程中,使用的是docker-in-docker技术,这种技术最主要的缺陷就是当一台机器上同时运行多个docker build流水线时,会出现阻塞的情况,因为这一批流水线用的是宿主机上的同一个docker进程。
基于这种情况,我们在droneCI流水线中换用了kaniko来进行docker镜像的创建。

遇到的难题

  1. kaniko是基于scratch构建的,里面没有shell,所以想在kaniko原生镜像里在调用python是很麻烦的
  2. kaniko创建docker镜像使用的是file system功能,如果想在一个kaniko容器里先创建ubuntu镜像,再创建alpine镜像, 是会有各种冲突问题的,需要使用–cleanup功能。此功能会清空文件系统,同时如果有自己装的shell,也会被清空,导致无法再次使用

解决方案

  1. kaniko的关键文件其实是/kaniko目录下的哪些 二进制文件,官方推荐是用gcr.io/kaniko-project/executor 镜像,其实我们可以拷贝这个/kaniko目录到我们自己的私有镜像
  2. shell没有的话,我们可以拷贝一个busybox进去,这样就有shell了
  3. 虽然–cleanup会清空file system,但是根据代码里的ignorepath设定,volume挂载目录和/kaniko目录会被忽略掉。所以我们可以有两种方式选择:一、通过volume的方式哦挂载busybox和自己的python代码到私有镜像里。二、把busybox和python代码加入/kaniko目录。

示例代码

Dockerfile如下:

FROM heiheidoc/kaniko-project-executor:v1.3.0 AS plugin

# 1.6.0的clean up有问题 https://github.com/GoogleContainerTools/kaniko/issues/1586

FROM heiheidoc/kaniko-project-executor:debug AS debug

FROM python:3.9.5-buster

COPY --from=背景plugin /kaniko /kaniko

COPY --from=debug /busybox /kaniko/busybox

ADD . /kaniko

ENV DOCKER_CONFIG /kaniko/.docker

CMD ["python3","/kaniko/main.py"]


部分python代码如下,功能是按照一定规则生成Docker镜像:

def run_shell(shell):
    print_green(shell)
    cmd = subprocess.Popen(shell, stdin=subprocess.PIPE, stderr=sys.stderr, close_fds=True,
                           stdout=sys.stdout, universal_newlines=True, shell=True,executable='/kaniko/busybox/sh', bufsize=1)
    cmd.communicate()
    return cmd.returncode
def run_executor():
    for folder_name, sub_dir, files in os.walk(os.getcwd()):
        if 'Dockerfile' in files:
            Dockefile_path = folder_name + "/Dockerfile"
            docker_info = folder_name.replace(os.getcwd(),'').split('/')
            harbor_image_name = REGISTRY_URL + "/" + owner_name + "/" + docker_info[1] + ":" + docker_info[2]
            cmd_build = "/kaniko/executor --cache=true --cache-dir=/cache --cleanup --skip-tls-verify --skip-tls-verify-pull --skip-tls-verify-registry" \
                        " --dockerfile=" + Dockefile_path + \
                        " --context=dir://" + folder_name + \
                        " --destination=" + harbor_image_name
            assert run_shell(cmd_build) == 0, "镜像build失败: " + harbor_image_name
if __name__ == "__main__":
    run_executor()

Docker安装redis环形集群

背景

因为有需求要3台机器来做一个redis高可用(o(╥﹏╥)哭o~~~),没办法,只能用一个奇怪的方式安装redis集群了。这里我们用的不是主从哨兵哦,阅读本文的作者不要误会啦。再就是,真不推荐用3个机器来做redis集群。

因为配置里的cluster-announce-ip不能用域名,只能用ip,而pod的IP在K8S里经常变,所以不能用K8S部署了,我们这里使用docker和docker-compose部署,总得来说还是很简单的,代码我上传至github上了
https://github.com/lizhenwei/redis-cluster-in-docker.git

拓扑图

假设我们用3台机器,IP地址192.168.0.94,192.168.0.95,192.168.0.96 ;分别是node1,node2,node3

-------    -------    -------
|node1| ---|node2|----|node3|
-------    -------    -------
master1    master2    master3
slaver2    slaver3    slaver1

安装完毕之后,需要我们手动通过`CLUSTER REPLICATE`命令调节master和slaver所处的容器,形成如上拓扑,这样假使node1挂掉的时候,我们的node2上能运行一个master2,node3上运行master3,master1(此处的master1是由slaver1变过来的)

安装方法

1.在每台机器上安装docker和docker-compose。国内可以用daocloud去下载。会比较快http://get.daocloud.io/
2. 在每台机器上下载该代码,进入代码目录,运行shell命令,会根据IP地址更改配置文件,并且创建redis的docker应用:

# 在每台机器上输入该命令,注意替换IP地址
bash docker-init.sh 192.168.0.94
3. 在任意一台机器上通过redis-cli创建集群
# 在每台机器上输入该命令,注意替换IP地址
bash redis-init.sh 192.168.0.94 192.168.0.95 192.168.0.96
# 弹出来的提示直接输入yes
4. 完成之后执行命令,检查集群是否运行成功
#进入redis容器
docker exec -it redis-cluster bash
#检查集群是否运行成功
redis-cli -a 92F1q99f9CnrkAuwJPItdj8brqeMtN3r -p 7000 cluster nodes

python代码访问redis-cluster

$ pip install redis-py-cluster
```
测试代码
```
>>> from rediscluster import RedisCluster
>>> # Requires at least one node for cluster discovery. Multiple nodes is recommended.
>>> startup_nodes = [{"host": "192.168.0.94", "port": "7000"}, {"host": "192.168.0.94", "port": "7001"},{"host": "192.168.0.95", "port": "7000"}, {"host": "192.168.0.95", "port": "7001"},{"host": "192.168.0.96", "port": "7000"}, {"host": "192.168.0.96", "port": "7001"}]
>>> rc = RedisCluster(startup_nodes=startup_nodes, decode_responses=True,password='password')
>>> rc.set("foo", "bar")
True
>>> print(rc.get("foo"))
'bar'

Sanic中添加基于Cron表达式的协程定时任务

需求

从页面上触发某一个接口之后,要在sanic里面添加一个计划任务。
例如:从页面上填写一个con表达式,后台根据此cron表达式定时执行任务

'*/1 * * * *'   # 这个是每1分钟执行一次的表达式

需要用到的python包有:Sanic,Apscheduler


示例代码

import time
from sanic import Sanic
from sanic.response import json
app = Sanic("App Name")
from apscheduler.schedulers.asyncio import AsyncIOScheduler
from apscheduler.triggers.cron import CronTrigger
def cron_job():
    print('打印当前时间:', time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time())))
@app.route('/')
async def index(request):
    # generate a URL for the endpoint `post_handler`
    print("浏览器里打开一下首页")
    scheduler = AsyncIOScheduler()
    scheduler.add_job(cron_job, CronTrigger.from_crontab('*/1 * * * *'))
    scheduler.start()
    return json({"hello": "thank you"})
if __name__ == "__main__":
    app.run(host="0.0.0.0", port=8000)

实验效果

Elasticsearch重建索引,收集nginx日志,以request_time为指标分析接口响应时间

起因

filebeat采集nginx的日志,以json格式解析后传入elasticsearch,全部字段都是text格式,我们需要把request_time变成double格式才能使用聚合搜索request_time的最大值.
但是elasticsearch的index一旦建立好之后,字段只能新增,不能修改,所以要修改request_time的数据类型,只能重建索引。
我们的步骤是:1.获得老索引的mapping信息,2.用这个mapping信息新建一个索引 3.用reindex方法,把老索引的数据迁移到新索引 4.确认新索引数据迁移成功,5.删除老索引 6.获得出新索引的mapping,7.使用新索引的mapping创建老索引。8.把新索引的数据倒回老索引 9.删除老索引
假设老索引:V1
临时索引:V2
nginx统计接口路径:path字段
nginx统计响应时间:request_time字段


流程图与说明


python代码

根据path,聚合查询出响应最大时间和平均时间,保留最大响应时间前500个到csv文件里

#
#  created by zhenwei.Li at 2020/11/3 17:50
#
#  filename : example4.py
#  description :
import csv
import json
import requests
if __name__ == '__main__':
    send_json = {
        "query": {
            "bool": {
                "must": [
                    {
                        "range": {
                            "@timestamp": {
                                "gte": 1533556800000,
                                "lte": 1604470685934
                            }
                        }
                    }
                ]
            }
        },
        "size": 0,
        "aggs": {
            "Job_gender_stats": {
                "terms": {
                    "field": "path.keyword",
                    "size": 500,
                    "order": {
                        "max_request_time": "desc"
                    }
                },
                "aggs": {
                    "max_request_time": {
                        "max": {
                            "field": "request_time"
                        }
                    },
                    "avg_request_time": {
                        "avg": {
                            "field": "request_time"
                        }
                    }
                }
            }
        }
    }
    res = requests.post(url="http://192.168.0.174:32164/192.168.0.67-eiop-frontend/_search", json=send_json)
    print(json.dumps(res.json()['aggregations']['Job_gender_stats']['buckets'], sort_keys=True, indent=4))
    buckets = res.json()['aggregations']['Job_gender_stats']['buckets']
    file_handle = open('research.csv', 'w', encoding='utf-8', newline='' "")
    # 2. 基于文件对象构建 csv写入对象
    csv_writer = csv.writer(file_handle)
    # 3. 构建列表头
    csv_writer.writerow(["路径", "出现次数", "平均响应时间(秒)", "最大响应时间(秒)"])
    for item in buckets:
        csv_writer.writerow(
            [item['key'], item['doc_count'], item['avg_request_time']['value'], item['max_request_time']['value']])
    # 5. 关闭文件
    file_handle.close()

效果图

[docker]通过rsyslog记录日志并转发nginx日志到python程序

记录我是如何把rsyslog做成docker镜像,获取nginx的accesslog并且转发到python的

关键点1 nginx日志配置

nginx日志要设置成json格式输出,nginx.conf如下图所示,这个可以在docker镜像中通过volume把nginx.conf挂载进去,然后把/var/log/nginx/access.log挂载到本地

user  root;
worker_processes  1;
error_log  /var/log/nginx/error.log warn;
pid        /var/run/nginx.pid;
events {
    worker_connections  1024;
}
http {
    include       /etc/nginx/mime.types;
    default_type  application/octet-stream;
    #log_format  main  '$remote_addr - $remote_user [$time_local] "$request" '
    #                  '$status $body_bytes_sent "$http_referer" '
    #                  '"$http_user_agent" "$http_x_forwarded_for"';
    log_format main '{"time_local": "$time_local", '
   '"path": "$request_uri", '
   '"ip": "$remote_addr", '
   '"time": "$time_iso8601", '
   '"user_agent": "$http_user_agent", '
   '"user_id_got": "$uid_got", '
   '"user_id_set": "$uid_set", '
   '"remote_user": "$remote_user", '
   '"request": "$request", '
   '"status": "$status", '
   '"body_bytes_sent": "$body_bytes_sent", '
   '"request_time": "$request_time", '
   '"http_referrer": "$http_referer" }';
    access_log  /var/log/nginx/access.log  main;
    sendfile        on;
    #tcp_nopush     on;
    keepalive_timeout  65;
    #gzip  on;
    include /etc/nginx/conf.d/*.conf;
}

关键点2 rsyslog配置与Dockerfile

编写一个51-nginx-forward.conf文件放置在/etc/rsyslog.d/下即可

module(load="imfile")
input(type="imfile"
      File="/var/log/nginx/access.log"
      Tag="mywebsite:")
# omfwd module for forwarding the logs to another tcp server
if( $syslogtag == 'mywebsite:')  then {
  action(type="omfwd" target="python服务器IP地址" port="6000" protocol="tcp"
            action.resumeRetryCount="100"
            queue.type="linkedList" queue.size="10000")
}

我们可以用一个Dockerfile来运行rsyslog,docker run的时候注意日志的挂载

FROM ubuntu:16.04
RUN apt-get update && apt-get install -y rsyslog; \
    rm -rf /var/lib/apt/lists/*
ADD 51-nginx-forward.conf /etc/rsyslog.d/.
# RUN cat /dev/null> /var/log/mail.log
CMD service rsyslog start && tail -f /var/log/syslog

关键点3 python程序通过tcp的方式读取rsyslog

python程序与rsyslog建立tcp连接,可以实时的进行数据库的插入语句

import asyncio
import json
import time
import database_init
class LogAnalyser:
    def __init__(self):
        pass
    def process(self, str_input):
        # print(str_input)
        str_input = str_input.decode("utf-8", errors="ignore")
        # Add your processing steps here
        # ...
        try:
            # Extract created_at from the log string
            str_splits = str_input.split("{", 1)
            json_text = "{" + str_splits[1]
            data = json.loads(json_text)
            created_at = data["time"]
            request_all = data["request"].split(" /", 1)
            http_type = request_all[0]
            path = data["path"]
            request_time = data["request_time"]
            if PREFIX in data["path"]:
                path = data["path"]
                return http_type, path, created_at,request_time  # The order is relevant for INSERT query params
        except Exception as e:
            print("error in read_rsylog.py,Class LogAnalyser,function process")
            print(e)
        return None
@asyncio.coroutine
def handle_echo(reader, writer):
    log_filter = LogAnalyser()
    while True:
        line = yield from reader.readline()
        if not line:
            break
        params = log_filter.process(line)
        if params:
            # 进行一堆操作,例如进行数据库的插入
            # execute_sql(params=params)
if __name__ == '__main__':
    CURSOR = database_init.DBConnect().CURSOR
    CONN = database_init.DBConnect().CONN
    PREFIX = database_init.DBConnect().CONFIG["TEST_SWAGGER"]["PREFIX"]
    database_init.DBConnect().create_table()
    loop = asyncio.get_event_loop()
    coro = asyncio.start_server(handle_echo, None, 6000, loop=loop)
    server = loop.run_until_complete(coro)
    # Serve requests until Ctrl+C is pressed
    print('Serving on {}'.format(server.sockets[0].getsockname()))
    try:
        loop.run_forever()
    except KeyboardInterrupt:
        pass
    # Close the server
    print("Closing the server.")
    server.close()
    loop.run_until_complete(server.wait_closed())
    loop.close()
    CURSOR.close()
    CONN.close()

苏ICP备18047533号-2