QuXiao's Blog

Life && Tech && Thoughts

Open-Falcon 源码阅读(三) Graph

Written on

Graph

Graph所做的事情,就是把用户每次push上来的数据,进行采样存储,并提供查询接口。

参考RRDtool的理念,在数据每次存入的时候,会自动进行采样、归档。在默认的归档策略,一分钟push一次的频率下,历史数据保存5年。同时为了不丢失信息量,数据归档的时候,会按照平均值采样、最大值采样、最小值采样存三份。用户在查询某个metric,在过去一年的历史数据时,Graph会选择最合适的采样频率,返回采样过后的数据,提高了数据查询速度。

存储

首先,Graph接收到来自Transfer模块的数据之后,并不是立即将数据保存至RRD,而且先保存在一组内存队列(store.GraphItems)中,后续在定期将在这些队列中的数据进行落地至RRD存储。内存队列分为两类:未处理过索引的队列、以及处理过索引的队列。 Graph模块接收到的数据,格式如下:

当Graph模块接收到数据(item)之后,会计算出两种Key:UUID和Md5。他们的计算公式可以简写为:
  • item.UUID = item.endpoint + item.metric + item.tags + item.dstype + item.step
  • item.md5 = item.endpoint + item.metric + item.tags

之后,Graph会将item发向三个地方:

  • 历史数据(store.GraphItems)
  • 索引数据
  • 实时数据(HistoryCache,虽然叫history,但是其实只保存了每个指标的最近3份数据)

历史数据方面,Graph直接将item存入某一个RRD文件中。其中,RRD文件和item本身的关系如下:

RRDFileName = rrd_base_path + item.md5 + item.dstype + item.step

具体是:

filename := fmt.Sprintf("%s/%s/%s_%s_%d.rrd", g.Config().RRD.Storage, md5[0:2], md5, dsType, step)

初始化RRD时,还会设置采样以及保存周期,代码如下:

const (
    RRA1PointCnt   = 720 // 1m一个点存12h
    RRA5PointCnt   = 576 // 5m一个点存2d
    RRA20PointCnt  = 504 // 20m一个点存7d
    RRA180PointCnt = 766 // 3h一个点存3month
    RRA720PointCnt = 730 // 12h一个点存1year
)

// 设置各种归档策略
// 1分钟一个点存 12小时
c.RRA("AVERAGE", 0.5, 1, RRA1PointCnt)
// 为什么1分钟粒度的点没有最大/最小值?

// 5m一个点存2d
c.RRA("AVERAGE", 0.5, 5, RRA5PointCnt)
c.RRA("MAX", 0.5, 5, RRA5PointCnt)
c.RRA("MIN", 0.5, 5, RRA5PointCnt)

// 20m一个点存7d
c.RRA("AVERAGE", 0.5, 20, RRA20PointCnt)
c.RRA("MAX", 0.5, 20, RRA20PointCnt)
c.RRA("MIN", 0.5, 20, RRA20PointCnt)

// 3小时一个点存3个月
c.RRA("AVERAGE", 0.5, 180, RRA180PointCnt)
c.RRA("MAX", 0.5, 180, RRA180PointCnt)
c.RRA("MIN", 0.5, 180, RRA180PointCnt)

// 12小时一个点存1year
c.RRA("AVERAGE", 0.5, 720, RRA720PointCnt)
c.RRA("MAX", 0.5, 720, RRA720PointCnt)
c.RRA("MIN", 0.5, 720, RRA720PointCnt)

索引

这里的『索引』,指的是item本身字段的一些对应关系,最终落地至数据库中,其中包括3张表:

  1. endpoint表 endpoint(endpoint, ts, t_create)
  2. tag_endpoint表 tag_endpoint(tag, endpoint_id, ts, t_create)
  3. endpoint_counter表 endpoint_counter(endpoint_id,counter,step,type,ts,t_create)

当索引还没有落地的时候,首先保存为索引缓存的形式,索引缓存结构如下:

其中,data的key是item.md5,value是IndexCacheItem结构。

索引缓存分为两部分:indexedItemCache 和 unIndexedItemCache,数据先进到unIndexedItemCache中,会有定时goroutine将其中的数据写入索引表中,之后将数据放入indexedItemCache中,用于用户的查询。(注:索引的key是item.md5,因此索引缓存中保存的数量可以理解为监测指标的种类)

查询

请求中的查询参数如下所示:

各字段的含义,在介绍Agent的章节中已经有所提及了,这里不再赘述。接收到查询请求之后的流程,可以总结为以下:

  1. 根据endpoint和counter,从索引中获取dsType和step
    1. endpoint + counter => md5
    2. 从indexedItemCache查找md5对应的item
    3. 没有找到的话,从DB中进行查找
  2. 开始/结束时间,按照step进行取整

  3. 根据endpoint、counter、dsType、step,获取对应的RRD文件名

  4. 从cache中查询数据
    1. 根据cache key获取items和flag
  5. 从历史数据中查询数据
    1. 如果cfg支持migrate,以及判断查询数据不在这个Graph实例,则从其它Graph实例进行查询
    2. 否则,查询本地rrd文件
  6. 将cache中的数据,以及rrd/其它Graph实例中的数据,进行合并

数据迁移

Graph模块支持在集群成员改变的情况下,将其它Graph模块的数据拉取过来,或者向其它Graph模块发送本地的数据,达到数据迁移的目的。 数据迁移相关的配置样例如下:

字段含义为:

  • enabled:是否启动迁移功能
  • concurrency:每个一致性哈希节点,负责迁移数据的工作协程数
  • replicas:一致性哈希中,每份数据的重复数
  • cluster:节点名称以及具体IP端口

模块启动时,迁移的工作会一并启动。于每一个节点(Graph实例),会启动concurrency个数个工作协程。每个迁移工作协程,从节点的channel中抓取任务,任务类型包括:

  • NET_TASK_M_SEND:向其它Graph模块发送某个md5(指标)的数据
  • NET_TASK_M_QUERY:向其它Graph模块发起查询请求
  • NET_TASK_M_PULL:从其它Graph模块拉取数据,然后保存至本地

这个迁移的功能并不能提供实时数据迁移,即对于某个cluster的宕机,其它节点无法感知,一致性哈希也没有重新分配节点,虽然配置可以reload,但是一致性哈希的关系在启动时已经确定。相关代码如下:

func migrate_start(cfg *g.GlobalConfig) {
        var err error
        var i int
        if cfg.Migrate.Enabled {
                Consistent.NumberOfReplicas = cfg.Migrate.Replicas

                for node, addr := range cfg.Migrate.Cluster {
                        Consistent.Add(node)        // 一致性哈希的节点已经确定
                        Net_task_ch[node] = make(chan *Net_task_t, 16)
                        clients[node] = make([]*rpc.Client, cfg.Migrate.Concurrency)

                        for i = 0; i < cfg.Migrate.Concurrency; i++ {
                                if clients[node][i], err = dial(addr, time.Second); err != nil {
                                        log.Fatalf("node:%s addr:%s err:%s\n", node, addr, err)
                                }
                                go net_task_worker(i, Net_task_ch[node], &clients[node][i], addr)
                        }
                }
        }
}

-- EOF --

comments powered by Disqus