当前位置:首页 >焦点 >远程写入prometheus存储 但是写入有一些情况下

远程写入prometheus存储 但是写入有一些情况下

2024-06-29 05:18:01 [百科] 来源:避面尹邢网

远程写入prometheus存储

作者:linux运维菜 运维 系统运维 prometheus一般都是远程采用pull方式获取数据,但是写入有一些情况下,不方便配置exporter,存储就希望能通过push的远程方式上传指标数据。

[[410873]]

简介

prometheus一般都是写入采用pull方式获取数据,但是存储有一些情况下,不方便配置exporter,远程就希望能通过push的写入方式上传指标数据。

1、存储可以采用pushgateway的远程方式,推送到pushgateway,写入然后prometheus通过pushgateway拉取数据。存储

远程写入prometheus存储 但是写入有一些情况下

2、远程在新版本中增加了一个参数:--enable-feature=remote-write-receiver,写入允许远程通过接口/api/v1/write,存储直接写数据到prometheus里面。

远程写入prometheus存储 但是写入有一些情况下

pushgateway在高并发的情况下还是比较消耗资源的,特别是开启一致性检查,高并发写入的时候特别慢。

远程写入prometheus存储 但是写入有一些情况下

第二种方式少了一层转发,速度应该比较快。

远程写入prometheus存储

接口

可以通过prometheus的http接口/api/v1/write提交数据,这个接口的数据格式有有要求:

  • 使用POST方式提交
  • 需要经过protobuf编码,依赖github.com/gogo/protobuf/proto
  • 可以使用snappy进行压缩,依赖github.com/golang/snappy

步骤:

  1. 收集指标名称,时间戳,值和标签
  2. 将数据转换成prometheus需要的数据格式
  3. 使用proto对数据进行编码,并用snappy进行压缩
  4. 通过httpClient提交数据
  1. package prome 
  2.  
  3. import ( 
  4.     "bufio" 
  5.     "bytes" 
  6.     "context" 
  7.     "io" 
  8.     "io/ioutil" 
  9.     "net/http" 
  10.     "net/url" 
  11.     "regexp" 
  12.     "time" 
  13.  
  14.     "github.com/gogo/protobuf/proto" 
  15.     "github.com/golang/snappy" 
  16.     "github.com/opentracing-contrib/go-stdlib/nethttp" 
  17.     opentracing "github.com/opentracing/opentracing-go" 
  18.     "github.com/pkg/errors" 
  19.     "github.com/prometheus/common/model" 
  20.     "github.com/prometheus/prometheus/pkg/labels" 
  21.     "github.com/prometheus/prometheus/prompb" 
  22.  
  23. type RecoverableError struct {  
  24.     error 
  25.  
  26. type HttpClient struct {  
  27.     url     *url.URL 
  28.     Client  *http.Client 
  29.     timeout time.Duration 
  30.  
  31. var MetricNameRE = regexp.MustCompile(`^[a-zA-Z_:][a-zA-Z0-9_:]*$`) 
  32.  
  33. type MetricPoint struct {  
  34.     Metric  string            `json:"metric"` // 指标名称 
  35.     TagsMap map[string]string `json:"tags"`   // 数据标签 
  36.     Time    int64             `json:"time"`   // 时间戳,单位是秒 
  37.     Value   float64           `json:"value"`  // 内部字段,最终转换之后的float64数值 
  38.  
  39. func (c *HttpClient) remoteWritePost(req []byte) error {  
  40.     httpReq, err := http.NewRequest("POST", c.url.String(), bytes.NewReader(req)) 
  41.     if err != nil {  
  42.         return err 
  43.     } 
  44.     httpReq.Header.Add("Content-Encoding", "snappy") 
  45.     httpReq.Header.Set("Content-Type", "application/x-protobuf") 
  46.     httpReq.Header.Set("User-Agent", "opcai") 
  47.     httpReq.Header.Set("X-Prometheus-Remote-Write-Version", "0.1.0") 
  48.     ctx, cancel := context.WithTimeout(context.Background(), c.timeout) 
  49.     defer cancel() 
  50.  
  51.     httpReq = httpReq.WithContext(ctx) 
  52.  
  53.     if parentSpan := opentracing.SpanFromContext(ctx); parentSpan != nil {  
  54.         var ht *nethttp.Tracer 
  55.         httpReq, ht = nethttp.TraceRequest( 
  56.             parentSpan.Tracer(), 
  57.             httpReq, 
  58.             nethttp.OperationName("Remote Store"), 
  59.             nethttp.ClientTrace(false), 
  60.         ) 
  61.         defer ht.Finish() 
  62.     } 
  63.  
  64.     httpResp, err := c.Client.Do(httpReq) 
  65.     if err != nil {  
  66.         // Errors from Client.Do are from (for example) network errors, so are 
  67.         // recoverable. 
  68.         return RecoverableError{ err} 
  69.     } 
  70.     defer func() {  
  71.         io.Copy(ioutil.Discard, httpResp.Body) 
  72.         httpResp.Body.Close() 
  73.     }() 
  74.  
  75.     if httpResp.StatusCode/100 != 2 {  
  76.         scanner := bufio.NewScanner(io.LimitReader(httpResp.Body, 512)) 
  77.         line := "" 
  78.         if scanner.Scan() {  
  79.             line = scanner.Text() 
  80.         } 
  81.         err = errors.Errorf("server returned HTTP status %s: %s", httpResp.Status, line) 
  82.     } 
  83.     if httpResp.StatusCode/100 == 5 {  
  84.         return RecoverableError{ err} 
  85.     } 
  86.     return err 
  87.  
  88. func buildWriteRequest(samples []*prompb.TimeSeries) ([]byte, error) {  
  89.  
  90.     req := &prompb.WriteRequest{  
  91.         Timeseries: samples, 
  92.     } 
  93.     data, err := proto.Marshal(req) 
  94.     if err != nil {  
  95.         return nil, err 
  96.     } 
  97.     compressed := snappy.Encode(nil, data) 
  98.     return compressed, nil 
  99.  
  100. type sample struct {  
  101.     labels labels.Labels 
  102.     t      int64 
  103.     v      float64 
  104.  
  105. const ( 
  106.     LABEL_NAME = "__name__" 
  107.  
  108. func convertOne(item *MetricPoint) (*prompb.TimeSeries, error) {  
  109.     pt := prompb.TimeSeries{ } 
  110.     pt.Samples = []prompb.Sample{ { }} 
  111.     s := sample{ } 
  112.     s.t = item.Time 
  113.     s.v = item.Value 
  114.     // name 
  115.     if !MetricNameRE.MatchString(item.Metric) {  
  116.         return &pt, errors.New("invalid metrics name") 
  117.     } 
  118.     nameLs := labels.Label{  
  119.         Name:  LABEL_NAME, 
  120.         Value: item.Metric, 
  121.     } 
  122.     s.labels = append(s.labels, nameLs) 
  123.     for k, v := range item.TagsMap {  
  124.         if model.LabelNameRE.MatchString(k) {  
  125.             ls := labels.Label{  
  126.                 Name:  k, 
  127.                 Value: v, 
  128.             } 
  129.             s.labels = append(s.labels, ls) 
  130.         } 
  131.     } 
  132.  
  133.     pt.Labels = labelsToLabelsProto(s.labels, pt.Labels) 
  134.     // 时间赋值问题,使用毫秒时间戳 
  135.     tsMs := time.Unix(s.t, 0).UnixNano() / 1e6 
  136.     pt.Samples[0].Timestamp = tsMs 
  137.     pt.Samples[0].Value = s.v 
  138.     return &pt, nil 
  139.  
  140. func labelsToLabelsProto(labels labels.Labels, buf []*prompb.Label) []*prompb.Label {  
  141.     result := buf[:0] 
  142.     if cap(buf) < len(labels) {  
  143.         result = make([]*prompb.Label, 0, len(labels)) 
  144.     } 
  145.     for _, l := range labels {  
  146.         result = append(result, &prompb.Label{  
  147.             Name:  l.Name, 
  148.             Value: l.Value, 
  149.         }) 
  150.     } 
  151.     return result 
  152.  
  153. func (c *HttpClient) RemoteWrite(items []MetricPoint) (err error) {  
  154.     if len(items) == 0 {  
  155.         return 
  156.     } 
  157.     ts := make([]*prompb.TimeSeries, len(items)) 
  158.     for i := range items {  
  159.         ts[i], err = convertOne(&items[i]) 
  160.         if err != nil {  
  161.             return 
  162.         } 
  163.     } 
  164.     data, err := buildWriteRequest(ts) 
  165.     if err != nil {  
  166.         return 
  167.     } 
  168.     err = c.remoteWritePost(data) 
  169.     return 
  170.  
  171. func NewClient(ur string, timeout time.Duration) (c *HttpClient, err error) {  
  172.     u, err := url.Parse(ur) 
  173.     if err != nil {  
  174.         return 
  175.     } 
  176.     c = &HttpClient{  
  177.         url:     u, 
  178.         Client:  &http.Client{ }, 
  179.         timeout: timeout, 
  180.     } 
  181.     return 

测试

prometheus启动的时候记得加参数--enable-feature=remote-write-receiver

  1. package prome 
  2.  
  3. import ( 
  4.     "testing" 
  5.     "time" 
  6.  
  7. func TestRemoteWrite(t *testing.T) {  
  8.     c, err := NewClient("http://localhost:9090/api/v1/write", 10*time.Second) 
  9.     if err != nil {  
  10.         t.Fatal(err) 
  11.     } 
  12.     metrics := []MetricPoint{  
  13.         { Metric: "opcai1", 
  14.             TagsMap: map[string]string{ "env": "testing", "op": "opcai"}, 
  15.             Time:    time.Now().Add(-1 * time.Minute).Unix(), 
  16.             Value:   1}, 
  17.         { Metric: "opcai2", 
  18.             TagsMap: map[string]string{ "env": "testing", "op": "opcai"}, 
  19.             Time:    time.Now().Add(-2 * time.Minute).Unix(), 
  20.             Value:   2}, 
  21.         { Metric: "opcai3", 
  22.             TagsMap: map[string]string{ "env": "testing", "op": "opcai"}, 
  23.             Time:    time.Now().Unix(), 
  24.             Value:   3}, 
  25.         { Metric: "opcai4", 
  26.             TagsMap: map[string]string{ "env": "testing", "op": "opcai"}, 
  27.             Time:    time.Now().Unix(), 
  28.             Value:   4}, 
  29.     } 
  30.     err = c.RemoteWrite(metrics) 
  31.     if err != nil {  
  32.         t.Fatal(err) 
  33.     } 
  34.     t.Log("end...") 

使用go test进行测试

  1. go test -v 

总结

这个方法也是在看夜莺v5的代码的时候发现的,刚好有需要统一收集redis的监控指标,刚好可以用上,之前用pushgateway写的实在是慢。

 

责任编辑:姜华 来源: 今日头条 prometheus监控远端服务

(责任编辑:知识)

    推荐文章
    • 养殖贷款怎么申请?在网上查询的征信有什么区别?

      养殖贷款怎么申请?在网上查询的征信有什么区别?养殖贷款怎么申请?向提供养殖贷款的银行提交《养殖业贷款申请书》、本人的身份证件及相关资料即可申请。申请以后到获得贷款将经历以下流程:1、调查贷款银行将对借款人提交的资料及还款能力进行调查,并实地调查抵 ...[详细]
    • 苹果预计将在当地时间9月12日发布iPhone 15系列

      苹果预计将在当地时间9月12日发布iPhone 15系列北京商报讯记者 陶凤 王柱力)8月30日,北京商报记者获悉,苹果秋季发布会将于当地时间9月12日举办,据了解,此次活动将在加利福尼亚州库比蒂诺Apple Park园区的史蒂夫乔布斯剧院举行,邀请函上写 ...[详细]
    • 1499元的荣耀X30发布后,中端手机市场标杆渐显

      1499元的荣耀X30发布后,中端手机市场标杆渐显如果大家想要选购一款表现均衡的终端产品,那么荣耀X30绝对不会让大家失望。12月16日,荣耀发布了万众期待的X系列新机——荣耀X30。作为荣耀八周年的回归之作,荣耀X30最大的亮点当属价格重回八年前。 ...[详细]
    • 海尔空调发布大数据新品 将转型“生态服务”

      海尔空调发布大数据新品 将转型“生态服务”可以模块组装的空气产品、“空穴来风”走进卧室、圆形神秘空调、桌面空调……近一个月,国内外媒体上不断曝光的“海尔即将于9月发布的多款革命性产品”终于揭开神秘面纱。9月19日,海尔联合中国气象局公共气象服 ...[详细]
    • 世界在建最宽独塔混合梁斜拉桥主塔基础工程完工 大桥全长1010米

      世界在建最宽独塔混合梁斜拉桥主塔基础工程完工 大桥全长1010米近日,由中国铁建所属中国铁建大桥局承建的世界在建最宽独塔混合梁斜拉桥——成都市东西城市轴线沱江大桥主塔承台浇筑完成,标志着大桥全面进入主塔施工阶段。东西城市轴线是成都市力推&l ...[详细]
    • VR创业更应该追求用户体验

      VR创业更应该追求用户体验“以人的健康为出发点的VR技术开发与内容开发才是VR创业的正确路径。”超维星球执行董事、中国3D产业联盟副秘书长盛志鼎在1月8日“VR家长汇”上说道。“VR家长汇”是工信部中国3D产业联盟、超维星球召 ...[详细]
    • PCP是什么意思?

      PCP是什么意思?程序化创意或程序化创意平台,programmatic creative platform)是一种由数据和算法驱动,通过对广告创意内容进行智能制作和创意优化,从而整合互联网创意产业上下游的技术。人工智能 ...[详细]
    • 智能自行车团队基本概念BiCi 获百万美金天使投资

      智能自行车团队基本概念BiCi    获百万美金天使投资今天,国内创业团队基本概念BICI的创始人蒋旻宸透露,将在11月22日正式发布第一辆成型的智能自行车产品。同时公司也刚刚获得了来自国内一线基金联创投资机构的数百万美金天使投资,估值超过千万美金。在蒋旻 ...[详细]
    • 有速度有质量!安徽省前三季度规上制造业增加值增长14%

      有速度有质量!安徽省前三季度规上制造业增加值增长14%记者近日从省财政厅获悉,安徽省持续优化营商环境,大力推动制造业“减负”,持续落实减税降费决策部署,前三季度,规上工业经济呈现有规模、有速度、有质量、可持续的良好发展态势,规模以 ...[详细]
    • 小米8SE京东热卖 骁龙710售价1399元起

      小米8SE京东热卖 骁龙710售价1399元起小米8SE目前正在京东热销,优惠价仅1399元起。【PChome手机频道资讯报道】京东11.11全球好物节持续进行中,今天带来一款性价比极高的小米8 SE。小米8 SE搭载了高通骁龙710移动平台,标 ...[详细]
    热点阅读