月度归档: 2020年9月

在K8S里使用filebeat作为sidecar收集nginx日志

简介

通过sidecar方法进行接入,与提供日志的容器部署在同一个pod里,主要是配置statefulset里的containers和configmap里的filebeat.yaml
1.把nginx的日志文件挂载在access_log这个volume里,同时在filebeat这个pod里也挂载access_log这个volume
2.filebeat通过subpath的方法挂载单独一个filebeat.yml到/usr/share/filebeat/filebeat.yml。注意,如果不用subpath挂载单个文件的话,是会覆盖掉/usr/share/filebeat/目录的

3.configmap里设置elasticsearch的地址和index,指定日志文件

 

statefulset.yaml

containers:
  - image: nginx:latest
    name: nginx
    ports:
        - containerPort: 80
    volumeMounts:
        - name: access-log #日志同时挂载在nginx和filebeat中
          mountPath: /var/log/nginx/
  - image: docker.elastic.co/beats/filebeat:6.8.12
    imagePullPolicy: Always
    name: filebeat
    volumeMounts:
        - name: access-log #日志同时挂载在nginx和filebeat中
          mountPath: /log
        - name: filebeat-config
          mountPath: /usr/share/filebeat/filebeat.yml
          subPath: filebeat.yml
  volumes:
    - name: filebeat-config
      configMap:
        name: filebeat-config
        items:
        - key: filebeat.yml
          path: filebeat.yml

configmap.yaml

---
apiVersion: v1
kind: ConfigMap
metadata:
  name: filebeat-config
data:
  filebeat.yml: |
    filebeat.inputs:
    - type: log
      paths:
        - "/log/access.log"
    setup.template.name: "filebeat"
    setup.template.pattern: "filebeat-*"
    output.elasticsearch:
      hosts: ["{{ .Values.elastricsearch.addr }}"]
      index: "frontend-filebeat"

 


架构图

[docker]通过rsyslog记录日志并转发nginx日志到python程序

记录我是如何把rsyslog做成docker镜像,获取nginx的accesslog并且转发到python的

关键点1 nginx日志配置

nginx日志要设置成json格式输出,nginx.conf如下图所示,这个可以在docker镜像中通过volume把nginx.conf挂载进去,然后把/var/log/nginx/access.log挂载到本地

user  root;
worker_processes  1;
error_log  /var/log/nginx/error.log warn;
pid        /var/run/nginx.pid;
events {
    worker_connections  1024;
}
http {
    include       /etc/nginx/mime.types;
    default_type  application/octet-stream;
    #log_format  main  '$remote_addr - $remote_user [$time_local] "$request" '
    #                  '$status $body_bytes_sent "$http_referer" '
    #                  '"$http_user_agent" "$http_x_forwarded_for"';
    log_format main '{"time_local": "$time_local", '
   '"path": "$request_uri", '
   '"ip": "$remote_addr", '
   '"time": "$time_iso8601", '
   '"user_agent": "$http_user_agent", '
   '"user_id_got": "$uid_got", '
   '"user_id_set": "$uid_set", '
   '"remote_user": "$remote_user", '
   '"request": "$request", '
   '"status": "$status", '
   '"body_bytes_sent": "$body_bytes_sent", '
   '"request_time": "$request_time", '
   '"http_referrer": "$http_referer" }';
    access_log  /var/log/nginx/access.log  main;
    sendfile        on;
    #tcp_nopush     on;
    keepalive_timeout  65;
    #gzip  on;
    include /etc/nginx/conf.d/*.conf;
}

关键点2 rsyslog配置与Dockerfile

编写一个51-nginx-forward.conf文件放置在/etc/rsyslog.d/下即可

module(load="imfile")
input(type="imfile"
      File="/var/log/nginx/access.log"
      Tag="mywebsite:")
# omfwd module for forwarding the logs to another tcp server
if( $syslogtag == 'mywebsite:')  then {
  action(type="omfwd" target="python服务器IP地址" port="6000" protocol="tcp"
            action.resumeRetryCount="100"
            queue.type="linkedList" queue.size="10000")
}

我们可以用一个Dockerfile来运行rsyslog,docker run的时候注意日志的挂载

FROM ubuntu:16.04
RUN apt-get update && apt-get install -y rsyslog; \
    rm -rf /var/lib/apt/lists/*
ADD 51-nginx-forward.conf /etc/rsyslog.d/.
# RUN cat /dev/null> /var/log/mail.log
CMD service rsyslog start && tail -f /var/log/syslog

关键点3 python程序通过tcp的方式读取rsyslog

python程序与rsyslog建立tcp连接,可以实时的进行数据库的插入语句

import asyncio
import json
import time
import database_init
class LogAnalyser:
    def __init__(self):
        pass
    def process(self, str_input):
        # print(str_input)
        str_input = str_input.decode("utf-8", errors="ignore")
        # Add your processing steps here
        # ...
        try:
            # Extract created_at from the log string
            str_splits = str_input.split("{", 1)
            json_text = "{" + str_splits[1]
            data = json.loads(json_text)
            created_at = data["time"]
            request_all = data["request"].split(" /", 1)
            http_type = request_all[0]
            path = data["path"]
            request_time = data["request_time"]
            if PREFIX in data["path"]:
                path = data["path"]
                return http_type, path, created_at,request_time  # The order is relevant for INSERT query params
        except Exception as e:
            print("error in read_rsylog.py,Class LogAnalyser,function process")
            print(e)
        return None
@asyncio.coroutine
def handle_echo(reader, writer):
    log_filter = LogAnalyser()
    while True:
        line = yield from reader.readline()
        if not line:
            break
        params = log_filter.process(line)
        if params:
            # 进行一堆操作,例如进行数据库的插入
            # execute_sql(params=params)
if __name__ == '__main__':
    CURSOR = database_init.DBConnect().CURSOR
    CONN = database_init.DBConnect().CONN
    PREFIX = database_init.DBConnect().CONFIG["TEST_SWAGGER"]["PREFIX"]
    database_init.DBConnect().create_table()
    loop = asyncio.get_event_loop()
    coro = asyncio.start_server(handle_echo, None, 6000, loop=loop)
    server = loop.run_until_complete(coro)
    # Serve requests until Ctrl+C is pressed
    print('Serving on {}'.format(server.sockets[0].getsockname()))
    try:
        loop.run_forever()
    except KeyboardInterrupt:
        pass
    # Close the server
    print("Closing the server.")
    server.close()
    loop.run_until_complete(server.wait_closed())
    loop.close()
    CURSOR.close()
    CONN.close()

logstash6.8.12动态生成elasticsearch的index的正确方法

网上有很多的【假】logstash动态生成index的文章,看了很多,根本不符合我的需求,所以我决定来一篇干货,真正的解决问题。人狠话不多,代码直接上。我是使用官方提供的helm包进行安装的,如果想要完整部署文件,可以来联系我。QQ:357244849


logstash的关键配置.这个配置文件的目的是这样的:

1.使用udp端口9998 接受来自数据源的信息;
2.通过filter插件,先把message转成json格式,这一步很重要,很多文章里都不提这个;
3.判断接受到的数据里,是否包含app_name这个key,如果包含,就把app_name里的值提出来,和当前日期组合到一起,作为传入elasticsearch的index名字。如果不包含,就生成一个字符串non_index_log作为elasticsearch的index名字;
4.把日志转发到elasticsearch里,同时打印在控制台里

    input {
      udp {
        port => 9998
      }
    }
    filter {
      json {
        source => "message"
      }
      if [app_name] {
        mutate {
          add_field => {
            "index_name" => "%{app_name}-%{+YYYY.MM.dd}"
          }
        }
      } else {
        mutate {
          add_field => {
            "index_name" =>  "non_index_log"
          }
        }
      }
    }
    output {
        elasticsearch {
           hosts => ["http://elasticsearch-master:9200"]
           index => "logstash-roc-%{index_name}"
           }
        stdout { codec => rubydebug }
    }

为了测试这样配置之后的logstash效果如何,我们可以使用一个python代码

#  All rights reserved.
#
#  filename : example2.py
#  description :
#
#  created by zhenwei.Li at 2020/9/2 15:52
import logging
import logstash
import sys
host = '192.168.0.12'
test_logger = logging.getLogger('python-logstash-logger')
test_logger.setLevel(logging.INFO)
test_logger.addHandler(logstash.LogstashHandler(host, 9998, version=1))
test_logger.error('python-logstash: test logstash error message.')
# add extra field to logstash message
extra = {
    'app_name': 'temp-lizhenwei-python2',
    'test_string': 'python version: ' + repr(sys.version_info),
    'test_boolean': True,
    'test_dict': {'a': 1, 'b': 'c'},
    'test_float': 1.23,
    'test_integer': 123,
    'type': 'temp-lishuai-python3'
}
test_logger.info('python-logstash: test extra fields', extra=extra)

通过运行上述代码,我们可以去检查logstash的控制台是否产生了index_name,理论上来说,会产生一个
"index_name" => "temp-lizhenwei-python2-2020.09.03"和"index_name" => "non_index_log"如下图所示


最后,我们去kibana里面检查一下elasticsearch的index,确实产生了我们需要的两个index

helm更新latest镜像

有不少朋友跟我说,helm更新statefullset或者deployment时,使用latest镜像,无法更新,其实这个问题很好解决的,可以使用git-hash来解决,参考文章https://www.yinyubo.cn/?p=535
也可以使用我们本篇文章里的办法,添加环境变量来解决
helm更新的原理是,yaml文件没有变更,则不会更新,我们要想使用latest镜像先terminating老的pod,再running一个新的pod,只要使我们的statefullset或者deployment的yaml文件发生变更即可。下面贴出解决代码

containers:
        - image: '镜像名:latest'
          imagePullPolicy: Always
          env:
            - name: upgrade_time
              value: {{ date "2006-01-02-150405" .Release.Time }}

镜像使用latest,拉取策略使用alway pull的策略。在环境变量里添加一个upgrade_time升级时间,该时间使用helm的date功能生成,这样我们的yaml就能做到每次helm upgrade都发生变更,每次都能去拉取最新的镜像并且升级。并且我们可以在部署之后,通过kubectl exec -it “pod名字” sh 进入容器,检查env里的upgrade_time看看是否更新。
很简单吧,快试试吧


苏ICP备18047533号-2