投资菜地

屋后的一亩三分绿色菜园地

2020-08-06

Influxdb 源码结构

服务端代码结构(尚未完成)

graph TD
A(Server)

B1(MonitorService)
B2(PrecreatorService)
B3(SnapshotterService)
B4(ContinuousQueryService)
B5(HTTPDService)
    B5_A1(handler)
        B5_B1(serveOptions)
        B5_B2(serveQuery)
            B5_B2_A1(InfluxQL)
        B5_B3(serveWrite)
            B5_B3_A1(PointWriter)
                B5_B3_A1_A1(TSM-engine)
                    B5_B3_A1_A1_A1(WAL)
                    B5_B3_A1_A1_A2(FileStore)
                    B5_B3_A1_A1_A3(Compactor)
                B5_B3_A1_A2(TSI)
        B5_B4(servePromWrite)
        B5_B5(servePromRead)
        B5_B6(servePing)
        B5_B7(serveStatus)
        B5_B8(promhttp_ServeHTTP)
B6(StorageService)
B7(RetentionPolicyService)
B8(GraphiteService/CollectdService/OpenTSDBService/UDPService)

A-->B1
A-->B2
A-->B3
A-->B4
A-->B5
A-->B6
A-->B7
A-->|可选|B8
B5-->B5_A1

subgraph HTTPDService
B5_A1-->|query-options/write-options|B5_B1
B5_A1-->|query|B5_B2
B5_A1-->|write|B5_B3
B5_A1-->|prometheus-write|B5_B4
B5_A1-->|prometheus-read|B5_B5
B5_A1-->|ping/ping-head|B5_B6
B5_A1-->|status/status-head|B5_B7
B5_A1-->|prometheus-metrics|B5_B8
B5_B3-->B5_B3_A1
end

subgraph InfluxQL
B5_B2-->B5_B2_A1
end

subgraph TSDB
B5_B3_A1-->B5_B3_A1_A1
B5_B3_A1_A2
end


subgraph TSM-engine
B5_B3_A1_A1-->B5_B3_A1_A1_A1
B5_B3_A1_A1-->B5_B3_A1_A1_A2
B5_B3_A1_A1-->B5_B3_A1_A1_A3
end

B5_B2_A1-->B5_B3_A1_A1
B5_B2_A1-->B5_B3_A1_A2

server

Server表示元数据、存储数据和服务的容器。它是使用配置来构建的,并以适当的顺序管理所有服务的启动和关闭。

service

server所包含的服务

模块名称 功能
MonitorService 给用户提供一些统计,诊断数据
PrecreatorService 创建一个定时函数,每隔一定时间间隔,检查并按需预先创建一次shard
SnapshotterService 接受并处理跟snapshot相关的命令
ContinuousQueryService 持续查询,通常用来压缩/聚合数据
HTTPDService 对外提供服务的主要协议,通过http协议查询数据、数据写入、修改database、增加用户等操作
StorageService  
RetentionPolicyService 定期检查每个database的过期策略,将满足过期条件的ShardGroup/Shard删除
   
UDPService 监听线路协议的传入数据包
GraphiteService 兼容Graphite监控数据接收协议
CollectdService 兼容Collectd监控数据接收协议
OpenTSDBService 兼容OpenTSDB监控数据接收协议

service接入 ```go func (s *Server) Open() error { // Start profiling, if set. startProfile(s.CPUProfile, s.MemProfile)

// Open shared TCP connection.
ln, err := net.Listen("tcp", s.BindAddress)
if err != nil {
	return fmt.Errorf("listen: %s", err)
}
s.Listener = ln

// Multiplex listener.
mux := tcp.NewMux()
go mux.Serve(ln)

// Append services.
s.appendMonitorService()
s.appendPrecreatorService(s.config.Precreator)
s.appendSnapshotterService()
s.appendContinuousQueryService(s.config.ContinuousQuery)
s.appendHTTPDService(s.config.HTTPD)
s.appendStorageService(s.config.Storage)
s.appendRetentionPolicyService(s.config.Retention)
for _, i := range s.config.GraphiteInputs {
	if err := s.appendGraphiteService(i); err != nil {
		return err
	}
}
for _, i := range s.config.CollectdInputs {
	s.appendCollectdService(i)
}
for _, i := range s.config.OpenTSDBInputs {
	if err := s.appendOpenTSDBService(i); err != nil {
		return err
	}
}
for _, i := range s.config.UDPInputs {
	s.appendUDPService(i)
}

s.Subscriber.MetaClient = s.MetaClient
s.PointsWriter.MetaClient = s.MetaClient
s.Monitor.MetaClient = s.MetaClient

s.SnapshotterService.Listener = mux.Listen(snapshotter.MuxHeader)
....... } ```

——————————————————————

HTTPDservice

是对外提供服务的主要协议,通过http协议查询数据、数据写入、修改database、增加用户等操作。

模块名称 功能
serveOptions 返回一个空响应
serveQuery 解析查询语句,如果有有效则执行查询
serveWrite 接收传入的数据,并写入数据库
servePromWrite 将Prometheus的远程查询请求转换成存储查询,并以Prometheus的格式返回
servePromRead 接收Prometheu的远程写入协议中的数据,并写入数据库
servePing 响应客户端的ping命令
serveStatus 已废弃,用ping代替
promhttp_ServeHTTP

通过路由规则配置http请求的处理器 ```go // NewService returns a new instance of Service. func NewService(c Config) Service { s := &Service{ addr: c.BindAddress, https: c.HTTPSEnabled, cert: c.HTTPSCertificate, key: c.HTTPSPrivateKey, limit: c.MaxConnectionLimit, err: make(chan error), unixSocket: c.UnixSocketEnabled, unixSocketPerm: uint32(c.UnixSocketPermissions), bindSocket: c.BindSocket, Handler: NewHandler(c), Logger: zap.NewNop(), } if s.key == “” { s.key = s.cert } if c.UnixSocketGroup != nil { s.unixSocketGroup = int(c.UnixSocketGroup) } s.Handler.Logger = s.Logger return s }

func NewHandler(c Config) *Handler { h := &Handler{ mux: pat.New(), Config: &c, Logger: zap.NewNop(), CLFLogger: log.New(os.Stderr, “[httpd] “, 0), Store: storage.NewStore(), stats: &Statistics{}, requestTracker: NewRequestTracker(), }

// Limit the number of concurrent & enqueued write requests.
h.writeThrottler = NewThrottler(c.MaxConcurrentWriteLimit, c.MaxEnqueuedWriteLimit)
h.writeThrottler.EnqueueTimeout = c.EnqueuedWriteTimeout

// Disable the write log if they have been suppressed.
writeLogEnabled := c.LogEnabled
if c.SuppressWriteLog {
	writeLogEnabled = false
}

h.AddRoutes([]Route{
	Route{
		"query-options", // Satisfy CORS checks.
		"OPTIONS", "/query", false, true, h.serveOptions,
	},
	Route{
		"query", // Query serving route.
		"GET", "/query", true, true, h.serveQuery,
	},
	Route{
		"query", // Query serving route.
		"POST", "/query", true, true, h.serveQuery,
	},
	Route{
		"write-options", // Satisfy CORS checks.
		"OPTIONS", "/write", false, true, h.serveOptions,
	},
	Route{
		"write", // Data-ingest route.
		"POST", "/write", true, writeLogEnabled, h.serveWrite,
	},
	Route{
		"prometheus-write", // Prometheus remote write
		"POST", "/api/v1/prom/write", false, true, h.servePromWrite,
	},
	Route{
		"prometheus-read", // Prometheus remote read
		"POST", "/api/v1/prom/read", true, true, h.servePromRead,
	},
	Route{ // Ping
		"ping",
		"GET", "/ping", false, true, h.servePing,
	},
	Route{ // Ping
		"ping-head",
		"HEAD", "/ping", false, true, h.servePing,
	},
	Route{ // Ping w/ status
		"status",
		"GET", "/status", false, true, h.serveStatus,
	},
	Route{ // Ping w/ status
		"status-head",
		"HEAD", "/status", false, true, h.serveStatus,
	},
	Route{
		"prometheus-metrics",
		"GET", "/metrics", false, true, promhttp.Handler().ServeHTTP,
	},
}...)

return h }

# 读写流程

## 数据写入代码调用流程

graph TD A(ServeWrite) B(PointWriter.WritePoints) C(PointsWriter.WritePointsPrivileged) D(PointsWriter.writeToShard) E(Store.WriteToShard) F(Shard.WritePoints) G(Engine.WritePoints)

A–>B B–>C C–>|获取points的ShardID|D D–>|TSDBStore|E E–>|根据ShardID获取Shard|F F–>|TSM engine|G



## 数据读取代码调用流程

graph TD A(ServeQuery) B(Parser.ParseQuery) C(QueryExecutor.ExecuteQuery)

A–>|influxql.NewParser创建解析器|B B–>C ```

TSM-engine读写重要接口

接口 功能
CreateIterator 返回表的游标
- KeyCursor 返回指定时间开始的游标
- MeasurementTagKeysByExpr 查询索引找到符合条件的数据源
WritePoints 写数据到engine
- ScheduleFullCompaction 压缩内存数据写入TSM文件

开源项目(更多)