Category Archive : Elasticsearch

ECK安装elasticsearch,接入apm测试

任务目标

以前都是用helm安装elasticsearch,最近发现elasticsearch推荐使用ECK在K8S上安装,那我们就来试试吧

我们会在已有的K8S上安装ECK,elasticsearch,kibana,apm,关闭ssl,loadbalancer暴露应用访问

测试golang接入apm

ECK创建过程

1.先安装上operator

kubectl create -f https://download.elastic.co/downloads/eck/1.7.1/crds.yaml
kubectl apply -f https://download.elastic.co/downloads/eck/1.7.1/operator.yaml

2.安装elasticsearch

cat <<EOF | kubectl apply -f -
apiVersion: elasticsearch.k8s.elastic.co/v1
kind: Elasticsearch
metadata:
  name: quickstart
spec:
  version: 7.14.1
  nodeSets:
  - name: default
    count: 1
    config:
      node.store.allow_mmap: false
EOF

3.安装kibana

cat <<EOF | kubectl apply -f -
apiVersion: kibana.k8s.elastic.co/v1
kind: Kibana
metadata:
  name: quickstart
spec:
  version: 7.14.1
  count: 1
  elasticsearchRef:
    name: quickstart
EOF

4.安装apm

cat <<EOF | kubectl apply -f -
apiVersion: apm.k8s.elastic.co/v1
kind: ApmServer
metadata:
  name: apm-server-quickstart
  namespace: default
spec:
  version: 7.14.1
  count: 1
  elasticsearchRef:
    name: quickstart
EOF

5.暴露kibana可外部访问,并且关闭ssl

kubectl edit kibanas.kibana.k8s.elastic.co quickstart。这里只贴上关键的spec部分代码

spec:
  count: 1
  elasticsearchRef:
    name: quickstart
  enterpriseSearchRef:
    name: ""
  http:
    service:
      metadata: {}
      spec:
        type: LoadBalancer
    tls:
      selfSignedCertificate:
        disabled: true

6.暴露apm可外部访问

kubectl edit apmserver.apm.k8s.elastic.co/apm-server-quickstart

修改的内容与上面kibana修改内容一致。


7.获取kibana登录用户名和密码

默认用户名 elastic

默认密码使用如下命令获取

kubectl get secret quickstart-es-elastic-user -o go-template='{{.data.elastic | base64decode }}'

8.获取apm-server的secret-token

kubectl get secret/apm-server-quickstart-apm-token -o go-template='{{index .data "secret-token" | base64decode}}'

golang测试APM-SERVER通信

1.设置环境变量,

# 服务名,不设置的话,就是代码的文件名
export ELASTIC_APM_SERVICE_NAME=

# apm服务器地址
export ELASTIC_APM_SERVER_URL=http://localhost:8200

# 我们上一步拿到的token
export ELASTIC_APM_SECRET_TOKEN=

# 可以设置也可以不设置,用于标识环境的,类似标签功能
export ELASTIC_APM_ENVIRONMENT=

2.编写golang测试代码main.go

package main

import (
	"fmt"
	"log"
	"net/http"

	"github.com/gorilla/mux"
	"go.elastic.co/apm/module/apmgorilla"
)

func helloHandler(w http.ResponseWriter, req *http.Request) {
	fmt.Fprintf(w, "Hello, %s!\n", mux.Vars(req)["name"])
}
func main() {
	r := mux.NewRouter()
	r.HandleFunc("/hello/{name}", helloHandler)
	r.Use(apmgorilla.Middleware())
	log.Fatal(http.ListenAndServe(":8000", r))
}

3,在kibana上检查apm的信息,应该会看到一个main的server,有一些数据,如下图所示,证明apm可成功连通

DataStreams+logstash+ILM进行日志定时删除,节省硬盘资源

背景

目前所有的K8S上的容器日志都被收集到了我们的ELK上,随着时间的推移,ELK上的日志所占的存储空间越来越多,我们需要一个定时清理的策略,以节约硬盘资源。
我们主要配置以下ELK里的这几个地方

  • 通过kibana新增一个lifecycle policies
  • 通过kibana新增一个index template,注意配置DataStreams
  • logstash 的logstashPipeline
  • filebeat的filebeat.yml文件

简要配置图


Kibana上添加 Lifecycle Policies

1.点击菜单栏的【management】->点击【stack management】
2.点击DATA目录下的【Index Lifecycle Policies】
3.点击【Create policy】创建一个新的生命周期规则
4.测试的话,规则就随便配置一个每10分钟迭代一个新的,删除超过1小时的index
期望效果:
对应的index会从00001开始每隔10分钟往上+1,同时最多存在7个index。


Kibana上添加Index Templates

1.点击菜单栏的【management】->点击【stack management】
2.点击DATA目录下的【Index Management】
3.点击【Index Templates】小标签,【Create template】创建模板
4.index patterns匹配我们logstash上传来的index,比如192*
5.Data stream的配置按钮打开

6.index settings配置上我们上一步添加的Lifecycle Policies

7.mappings参数需要配置【mapped fields】和【Dynamic Template】内容可以从logstash的配置里完整复制过来
8.其他的诸如component telmpalte 和Aliases都不用配置了。保存这个index template就行了


logstash配置

因为要动态生成index,所以要写一些filter规则,这里就不贴出来了。关键注意output里要设置

"action" => "create"和ilm_enabled => false
logstash.conf: |
    input {
      beats {
        port => 5044
      }
      tcp {
        port => 9999
      }
      udp {
        port => 9998
      }
    }
    filter {
      json {
        source => "message"
      }
      if [app_name] {
        mutate {
          add_field => {
            "index_name" => "%{app_name}"
          }
        }
      } else {
        mutate {
          add_field => {
            "index_name" =>  "non_index_log"
          }
        }
      }
    }
    output {
        elasticsearch {
           hosts => ["http://elasticsearch-master-headless:9200"]
           index => "%{index_name}"
           action => "create"
           ilm_enabled => false
        }
        stdout { codec => rubydebug }
    }

filebeat配置

因为有一些日志是通过filebeat传上来的,所以filebeat也要进行少量的配置,传一个app_name到logstash用于生成index

filebeat.inputs:
  - type: log
    paths:
      - "/log/*.log"
processors:
  - decode_json_fields:
        fields: ["message"]
        process_array: false
        max_depth: 1
        target: ""
        overwrite_keys: false
  - add_fields:
      target: ''
      fields:
        app_name: "{{ .Values.nodeSelector.internet_ip}}-aimp-frontend-v2-ux"
output.logstash:
  hosts: ["{{ .Values.logs.logstash.host }}:{{ .Values.logs.logstash.beatport }}"]

从零开始安装istio与skywalking

版本

istio 安装1.8.2版本
skywalking安装8.1.0版本
K8S集群使用rancher安装1.19版本


istio安装

1.下载istio包到k8s的任意一台master机器上

curl -L https://istio.io/downloadIstio | sh -

2. 进入istio目录,设置环境变量,后续我们的istio安装,都在该目录下进行操作

cd istio-1.8.2
export PATH=$PWD/bin:$PATH

3.安装istio,同时设置skywalking-oap地址

输入如下命令
istioctl install \
  --set profile=demo \
  --set meshConfig.enableEnvoyAccessLogService=true \
  --set meshConfig.defaultConfig.envoyAccessLogService.address=skywalking-oap.istio-system:11800 等待出现如下回显即可完成 ✔ Istio core installed
✔ Istiod installed
✔ Egress gateways installed
✔ Ingress gateways installed
✔ Installation complete

4.安装kiali。安装成功后,记得把kiali通过ingress暴露出来,我是使用的traefik来暴露的

kubectl apply -f samples/addons
kubectl rollout status deployment/kiali -n istio-system

skywalking安装

git clone https://github.com/apache/skywalking-kubernetes.git
cd skywalking-kubernetes/chart
helm repo add elastic https://helm.elastic.co
helm dep up skywalking
helm install 8.1.0 skywalking -n istio-system \
  --set oap.env.SW_ENVOY_METRIC_ALS_HTTP_ANALYSIS=k8s-mesh \
  --set fullnameOverride=skywalking \
  --set oap.envoy.als.enabled=true \
  --set ui.image.tag=8.1.0 \
  --set oap.image.tag=8.1.0-es6 \
  --set oap.storageType=elasticsearch

上述命令会安装一个skywalking和一个elasticsearch 6.8.6版本
安装完后,暴露skywalking-ui到外部,让用户可以通过页面访问


安装测试程序

通过以下命令进行安装,安装完成后,可以自己设置一下,通过loadalance暴露到外网访问,loadbalance的IP可以通过metallb搞一个

kubectl apply -f samples/bookinfo/platform/kube/bookinfo.yaml
kubectl apply -f samples/bookinfo/networking/bookinfo-gateway.yaml
kubectl apply -f samples/bookinfo/networking/destination-rule-all.yaml

部署效果

浏览器进入bookinfo测试程序

进入kiali检查出现的流量分布情况

skywalking的界面

Elasticsearch重建索引,收集nginx日志,以request_time为指标分析接口响应时间

起因

filebeat采集nginx的日志,以json格式解析后传入elasticsearch,全部字段都是text格式,我们需要把request_time变成double格式才能使用聚合搜索request_time的最大值.
但是elasticsearch的index一旦建立好之后,字段只能新增,不能修改,所以要修改request_time的数据类型,只能重建索引。
我们的步骤是:1.获得老索引的mapping信息,2.用这个mapping信息新建一个索引 3.用reindex方法,把老索引的数据迁移到新索引 4.确认新索引数据迁移成功,5.删除老索引 6.获得出新索引的mapping,7.使用新索引的mapping创建老索引。8.把新索引的数据倒回老索引 9.删除老索引
假设老索引:V1
临时索引:V2
nginx统计接口路径:path字段
nginx统计响应时间:request_time字段


流程图与说明


python代码

根据path,聚合查询出响应最大时间和平均时间,保留最大响应时间前500个到csv文件里

#
#  created by zhenwei.Li at 2020/11/3 17:50
#
#  filename : example4.py
#  description :
import csv
import json
import requests
if __name__ == '__main__':
    send_json = {
        "query": {
            "bool": {
                "must": [
                    {
                        "range": {
                            "@timestamp": {
                                "gte": 1533556800000,
                                "lte": 1604470685934
                            }
                        }
                    }
                ]
            }
        },
        "size": 0,
        "aggs": {
            "Job_gender_stats": {
                "terms": {
                    "field": "path.keyword",
                    "size": 500,
                    "order": {
                        "max_request_time": "desc"
                    }
                },
                "aggs": {
                    "max_request_time": {
                        "max": {
                            "field": "request_time"
                        }
                    },
                    "avg_request_time": {
                        "avg": {
                            "field": "request_time"
                        }
                    }
                }
            }
        }
    }
    res = requests.post(url="http://192.168.0.174:32164/192.168.0.67-eiop-frontend/_search", json=send_json)
    print(json.dumps(res.json()['aggregations']['Job_gender_stats']['buckets'], sort_keys=True, indent=4))
    buckets = res.json()['aggregations']['Job_gender_stats']['buckets']
    file_handle = open('research.csv', 'w', encoding='utf-8', newline='' "")
    # 2. 基于文件对象构建 csv写入对象
    csv_writer = csv.writer(file_handle)
    # 3. 构建列表头
    csv_writer.writerow(["路径", "出现次数", "平均响应时间(秒)", "最大响应时间(秒)"])
    for item in buckets:
        csv_writer.writerow(
            [item['key'], item['doc_count'], item['avg_request_time']['value'], item['max_request_time']['value']])
    # 5. 关闭文件
    file_handle.close()

效果图

在K8S里使用filebeat作为sidecar收集nginx日志

简介

通过sidecar方法进行接入,与提供日志的容器部署在同一个pod里,主要是配置statefulset里的containers和configmap里的filebeat.yaml
1.把nginx的日志文件挂载在access_log这个volume里,同时在filebeat这个pod里也挂载access_log这个volume
2.filebeat通过subpath的方法挂载单独一个filebeat.yml到/usr/share/filebeat/filebeat.yml。注意,如果不用subpath挂载单个文件的话,是会覆盖掉/usr/share/filebeat/目录的

3.configmap里设置elasticsearch的地址和index,指定日志文件

 

statefulset.yaml

containers:
  - image: nginx:latest
    name: nginx
    ports:
        - containerPort: 80
    volumeMounts:
        - name: access-log #日志同时挂载在nginx和filebeat中
          mountPath: /var/log/nginx/
  - image: docker.elastic.co/beats/filebeat:6.8.12
    imagePullPolicy: Always
    name: filebeat
    volumeMounts:
        - name: access-log #日志同时挂载在nginx和filebeat中
          mountPath: /log
        - name: filebeat-config
          mountPath: /usr/share/filebeat/filebeat.yml
          subPath: filebeat.yml
  volumes:
    - name: filebeat-config
      configMap:
        name: filebeat-config
        items:
        - key: filebeat.yml
          path: filebeat.yml

configmap.yaml

---
apiVersion: v1
kind: ConfigMap
metadata:
  name: filebeat-config
data:
  filebeat.yml: |
    filebeat.inputs:
    - type: log
      paths:
        - "/log/access.log"
    setup.template.name: "filebeat"
    setup.template.pattern: "filebeat-*"
    output.elasticsearch:
      hosts: ["{{ .Values.elastricsearch.addr }}"]
      index: "frontend-filebeat"

 


架构图

logstash6.8.12动态生成elasticsearch的index的正确方法

网上有很多的【假】logstash动态生成index的文章,看了很多,根本不符合我的需求,所以我决定来一篇干货,真正的解决问题。人狠话不多,代码直接上。我是使用官方提供的helm包进行安装的,如果想要完整部署文件,可以来联系我。QQ:357244849


logstash的关键配置.这个配置文件的目的是这样的:

1.使用udp端口9998 接受来自数据源的信息;
2.通过filter插件,先把message转成json格式,这一步很重要,很多文章里都不提这个;
3.判断接受到的数据里,是否包含app_name这个key,如果包含,就把app_name里的值提出来,和当前日期组合到一起,作为传入elasticsearch的index名字。如果不包含,就生成一个字符串non_index_log作为elasticsearch的index名字;
4.把日志转发到elasticsearch里,同时打印在控制台里

    input {
      udp {
        port => 9998
      }
    }
    filter {
      json {
        source => "message"
      }
      if [app_name] {
        mutate {
          add_field => {
            "index_name" => "%{app_name}-%{+YYYY.MM.dd}"
          }
        }
      } else {
        mutate {
          add_field => {
            "index_name" =>  "non_index_log"
          }
        }
      }
    }
    output {
        elasticsearch {
           hosts => ["http://elasticsearch-master:9200"]
           index => "logstash-roc-%{index_name}"
           }
        stdout { codec => rubydebug }
    }

为了测试这样配置之后的logstash效果如何,我们可以使用一个python代码

#  All rights reserved.
#
#  filename : example2.py
#  description :
#
#  created by zhenwei.Li at 2020/9/2 15:52
import logging
import logstash
import sys
host = '192.168.0.12'
test_logger = logging.getLogger('python-logstash-logger')
test_logger.setLevel(logging.INFO)
test_logger.addHandler(logstash.LogstashHandler(host, 9998, version=1))
test_logger.error('python-logstash: test logstash error message.')
# add extra field to logstash message
extra = {
    'app_name': 'temp-lizhenwei-python2',
    'test_string': 'python version: ' + repr(sys.version_info),
    'test_boolean': True,
    'test_dict': {'a': 1, 'b': 'c'},
    'test_float': 1.23,
    'test_integer': 123,
    'type': 'temp-lishuai-python3'
}
test_logger.info('python-logstash: test extra fields', extra=extra)

通过运行上述代码,我们可以去检查logstash的控制台是否产生了index_name,理论上来说,会产生一个
"index_name" => "temp-lizhenwei-python2-2020.09.03"和"index_name" => "non_index_log"如下图所示


最后,我们去kibana里面检查一下elasticsearch的index,确实产生了我们需要的两个index


苏ICP备18047533号-2