elements = page.query_selector_all(".search-result__item")
number = 1
for item in elements:
number = number + 1
page.mouse.wheel(0, 100)
item.screenshot(path="image/"+str(number) + ".png")
time.sleep(1)
{{ .Values.Count }} items are made of {{ .Values.Material }}
{{ .Values.Material }} items are made of {{ .Values.Material }}
{{ .Values.Material }} items are made of {{ .Values.Count }}
{{ .Values.mqtt.server }} dadasdjsaijaid
import re
from ruamel import yaml
def traverse(dic, path=None):
if not path:
path = []
if isinstance(dic, dict):
for x in dic.keys():
local_path = path[:]
local_path.append(x)
for b in traverse(dic[x], local_path):
yield b
else:
yield path, dic
def template_render(source_file, values_file, dest_file):
with open(source_file, 'r') as source:
origin = source.read()
with open(values_file, 'r', encoding='utf-8') as vaules:
result = yaml.load_all(vaules.read(), Loader=yaml.Loader)
yaml_dict = list(result)[0]
for x in traverse(yaml_dict):
match = "\{\{ \.Values." + '.'.join(x[0]) + " \}?\}"
origin = re.sub(match, str(x[1]), origin)
with open(dest_file, 'w+') as dest:
dest.write(origin)
if __name__ == '__main__':
template_render('sample.tmpl', "values.yml","result.yaml")
result.yaml 渲染结果
14 items are made of Wool Wool items are made of Wool Wool items are made of 14 172.15.62.2 dadasdjsaijaid
module(load="imfile")
input(type="imfile"
File="/var/log/nginx/access.log"
Tag="mywebsite:")
# omfwd module for forwarding the logs to another tcp server
if( $syslogtag == 'mywebsite:') then {
action(type="omfwd" target="python服务器IP地址" port="6000" protocol="tcp"
action.resumeRetryCount="100"
queue.type="linkedList" queue.size="10000")
}
我们可以用一个Dockerfile来运行rsyslog,docker run的时候注意日志的挂载
FROM ubuntu:16.04
RUN apt-get update && apt-get install -y rsyslog; \
rm -rf /var/lib/apt/lists/*
ADD 51-nginx-forward.conf /etc/rsyslog.d/.
# RUN cat /dev/null> /var/log/mail.log
CMD service rsyslog start && tail -f /var/log/syslog
关键点3 python程序通过tcp的方式读取rsyslog
python程序与rsyslog建立tcp连接,可以实时的进行数据库的插入语句
import asyncio
import json
import time
import database_init
class LogAnalyser:
def __init__(self):
pass
def process(self, str_input):
# print(str_input)
str_input = str_input.decode("utf-8", errors="ignore")
# Add your processing steps here
# ...
try:
# Extract created_at from the log string
str_splits = str_input.split("{", 1)
json_text = "{" + str_splits[1]
data = json.loads(json_text)
created_at = data["time"]
request_all = data["request"].split(" /", 1)
http_type = request_all[0]
path = data["path"]
request_time = data["request_time"]
if PREFIX in data["path"]:
path = data["path"]
return http_type, path, created_at,request_time # The order is relevant for INSERT query params
except Exception as e:
print("error in read_rsylog.py,Class LogAnalyser,function process")
print(e)
return None
@asyncio.coroutine
def handle_echo(reader, writer):
log_filter = LogAnalyser()
while True:
line = yield from reader.readline()
if not line:
break
params = log_filter.process(line)
if params:
# 进行一堆操作,例如进行数据库的插入
# execute_sql(params=params)
if __name__ == '__main__':
CURSOR = database_init.DBConnect().CURSOR
CONN = database_init.DBConnect().CONN
PREFIX = database_init.DBConnect().CONFIG["TEST_SWAGGER"]["PREFIX"]
database_init.DBConnect().create_table()
loop = asyncio.get_event_loop()
coro = asyncio.start_server(handle_echo, None, 6000, loop=loop)
server = loop.run_until_complete(coro)
# Serve requests until Ctrl+C is pressed
print('Serving on {}'.format(server.sockets[0].getsockname()))
try:
loop.run_forever()
except KeyboardInterrupt:
pass
# Close the server
print("Closing the server.")
server.close()
loop.run_until_complete(server.wait_closed())
loop.close()
CURSOR.close()
CONN.close()