higress-core源码分析_博客-Higress官网
铭师堂的云原生升级实践点此了解

higress-core源码分析

SJC

发布时间 2024-1-25


引言

在开源社区中,源码分析是深入理解项目内部机制的关键步骤。通过仔细研究项目的源代码,我们可以揭示背后的设计原理、算法和工作流程。这不仅有助于提高我们的编程技能,还可以为开发者社区提供宝贵的学习资源。

本文将聚焦于分析Higress的源码,该项目在开源界备受瞩目。通过浏览阅读其代码,我们将初步展现其核心特性、架构设计和实现细节。希望各位用户以及开发者在本文中能够找到有价值的见解。

关于Higress

Higress是基于阿里内部的Envoy Gateway实践沉淀、以开源Istio + Envoy为核心构建的下一代云原生网关,实现了流量网关 + 微服务网关 + 安全网关三合一的高集成能力,深度集成Dubbo、Nacos、Sentinel等微服务技术栈,能够帮助用户极大的降低网关的部署及运维成本且能力不打折;在标准上全面支持Ingress与Gateway API,积极拥抱云原生下的标准API规范;同时,Higress Controller也支持Nginx Ingress平滑迁移,帮助用户零成本快速迁移到Higress。

代码目录说明

  • cmd: 命令行参数解析等处理代码
  • pkg/ingress: Ingress 资源转换为 Istio 资源等相关代码
  • pkg/bootstrap: 包括启动 gRPC/xDS/HTTP server 等的代码
  • registry: 实现对接多种注册中心进行服务发现的代码
  • envoy: 依赖的 envoy 官方仓库 commit,以及对应的补丁代码
  • istio: 依赖的 istio 官方仓库 commit,以及对应的补丁代码
  • plugins: Higress 插件 sdk,以及官方内置插件代码
  • script: 编译相关脚本
  • docker: docker 镜像构建相关脚本

本文主要围绕着higress-core组件的源码,研究higress-core自启动以来做了哪些事情。higress-core的代码主要位于pkg目录下。

higress-core源码整体分析

启动入口:pkg/cmd/server.go

在该文件下,主要进行了命令参数的解析,并调用NewServer来创建一个Server实例,调用该实例的Start方法来启动实例Server,调用waitForMonitorSignal方法来监听进程的退出。

显然这里有两个很重要的方法函数:NewServerStart,接下来针对这两个函数进行具体分析。

创建实例:NewServer

初始化Pilot的环境 APImodel.Environment

model.Environment为Pilot 中的核心环境对象,集成了许多不同的组件,包括服务发现、配置存储、网络观察等,以提供一个完整的环境 API。

e := &model.Environment{
PushContext: model.NewPushContext(),
DomainSuffix: constants.DefaultKubernetesDomain,
MCPMode: true,
}

model.Environment设置Ledger

e.SetLedger(buildLedger(args.RegistryOptions))

Ledger 是一个表示经过修改的 map 的接口。这个 map 具有三个独特的特征:

  1. 每个 map 的唯一状态都有一个唯一的哈希值。
  2. map 的先前状态在固定的时间内被保留。
  3. 给定先前的哈希值,我们可以从 map 中检索先前的状态(如果仍然被保留)。

Ledger提供了个接口来获取之前版本的Value,接口详情如下:

// GetPreviousValue executes a get against a previous version of the ledger, using that version's root hash.
GetPreviousValue(previousRootHash, key string) (result string, err error)

model.Environment设置服务发现的接口ServiceDiscovery,用于列举服务和实例。

ac := aggregate.NewController(aggregate.Options{
MeshHolder: e,
})
e.ServiceDiscovery = ac

初始化Server实例

s := &Server{
ServerArgs: args,
httpMux: http.NewServeMux(),
environment: e,
readinessProbes: make(map[string]readinessProbe),
server: server.New(),
}
s.environment.Watcher = mesh.NewFixedWatcher(&v1alpha1.MeshConfig{})
s.environment.Init()

这里说明一下Server结构体的各个字段的含义:

type Server struct {
*ServerArgs // Server参数配置
environment *model.Environment // Pilot环境配置
kubeClient higresskube.Client // 与 Kubernetes 集成的客户端
configController model.ConfigStoreCache // 配置存储控制器
configStores []model.ConfigStoreCache // 多个配置存储的缓存
httpServer *http.Server // HTTP 请求处理器
httpMux *http.ServeMux // HTTP 请求多路复用器
grpcServer *grpc.Server // grpc服务
xdsServer *xds.DiscoveryServer // xds服务
server server.Instance // Pilot Server实例配置
readinessProbes map[string]readinessProbe // server内部服务记录表,记录服务是否准备好
}

在这里,server.Instance维护了一个chan变量components chan ComponentComponent则定义了一个函数方法模板。当我们调用InstanceRunComponent(t Component)方法,则会将变量t发送给components管道,而我们调用InstanceStart(stop <-chan struct{}) error方法,则会从components管道去接收Component对象,并执行该对象的函数方法。

type Component func(stop <-chan struct{}) error

创建初始启动函数列表

initFuncList := []func() error{
s.initKubeClient,
s.initXdsServer,
s.initHttpServer,
s.initConfigController,
s.initRegistryEventHandlers,
s.initAuthenticators,
}
  • initKubeClient函数

    1. 首先判断kubeClient是否为空,不为空进行下一步;

    2. 调用istiokube.DefaultRestConfig方法,创建一个具有 RESTful 风格的Kubernetes Config

    3. Kubernetes Config为入参,调用istiokube.NewClientConfigForRestConfig方法创建一个具有 RESTful 风格的Kubernetes Client Config

    4. 调用higress定义的NewClient方法,集成 Istio 客户端、Higress 客户端和可能的 Kingress 客户端,并为每个客户端设置了相应的 SharedInformerFactory。

  • initXdsServer函数

    1. 调用pliot xdsNewDiscoveryServer方法,创建一个xdsServer实例,并初始化一些资源。
      s.xdsServer = xds.NewDiscoveryServer(s.environment, nil, PodName, PodNamespace, s.RegistryOptions.KubeOptions.ClusterAliases)
      s.xdsServer.McpGenerators[gvk.WasmPlugin.String()] = &mcp.WasmpluginGenerator{Server: s.xdsServer}
      s.xdsServer.McpGenerators[gvk.DestinationRule.String()] = &mcp.DestinationRuleGenerator{Server: s.xdsServer}
      s.xdsServer.McpGenerators[gvk.EnvoyFilter.String()] = &mcp.EnvoyFilterGenerator{Server: s.xdsServer}
      s.xdsServer.McpGenerators[gvk.Gateway.String()] = &mcp.GatewayGenerator{Server: s.xdsServer}
      s.xdsServer.McpGenerators[gvk.VirtualService.String()] = &mcp.VirtualServiceGenerator{Server: s.xdsServer}
      s.xdsServer.McpGenerators[gvk.ServiceEntry.String()] = &mcp.ServiceEntryGenerator{Server: s.xdsServer}
      s.xdsServer.ProxyNeedsPush = func(proxy *model.Proxy, req *model.PushRequest) bool {
      return true
      }
    2. xdsServerStart方法注册给pilot server instance,并调用higress server的initGrpcServer初始化函数,将xds服务注册在grpc服务上,使得可以通过grpc协议端口进行xds协议的通信。
  • initHttpServer函数,初始化http server,添加8888端口的debug接口的处理器,并添加/ready路由的处理器,用于遍历higress server的readinessProbes,判断内部服务记录表内的服务是否准备完毕。

    s.xdsServer.AddDebugHandlers(s.httpMux, nil, true, nil)
    s.httpMux.HandleFunc("/ready", s.readyHandler)
  • initConfigController函数,这部分内容比较核心,将会在后面详细展开,暂时先跳过。

  • initRegistryEventHandlers函数,这部分内容跟initConfigController函数有关,暂时先跳过。

  • initAuthenticators函数,顾名思义,就是初始化认证器,并将其应用于 xDS 服务器。通过添加不同的认证器,可以支持不同的身份验证方式,这样服务器在处理请求时可以验证客户端的身份信息。

遍历启动函数列表,执行启动函数

for _, f := range initFuncList {
if err := f(); err != nil {
return nil, err
}
}

注册Pilot Server

s.server.RunComponent(func(stop <-chan struct{}) error {
s.kubeClient.RunAndWait(stop)
return nil
})

注意这里的kubeClient所调用的方法是istio的Kubernetes客户端提供的。

在ready服务记录表里注册xds服务

s.readinessProbes["xds"] = func() (bool, error) {
return s.xdsServer.IsServerReady(), nil
}

启动实例:Start

  • 第一部分:遍历pilot server的components,执行chan管道里的函数
if err := s.server.Start(stop); err != nil {
return err
}
  • 第二部分:等待缓存同步完成
if !s.waitForCacheSync(stop) {
return fmt.Errorf("failed to sync cache")
}
  • 第三部分:启动grpc服务
grpcListener, err := net.Listen("tcp", s.GrpcAddress)
if err != nil {
return err
}
go func() {
log.Infof("starting gRPC discovery service at %s", grpcListener.Addr())
if err := s.grpcServer.Serve(grpcListener); err != nil {
log.Errorf("error serving GRPC server: %v", err)
}
}()
  • 第四部分:启动http服务
httpListener, err := net.Listen("tcp", s.HttpAddress)
if err != nil {
return err
}
go func() {
log.Infof("starting HTTP service at %s", httpListener.Addr())
if err := s.httpServer.Serve(httpListener); err != nil {
log.Errorf("error serving http server: %v", err)
}
}()

higress-core核心源码分析

在前面有两个函数initConfigControllerinitRegistryEventHandlers我们没有介绍,这两个函数可以说是higress-core的核心代码,接下来对这两个函数进行具体剖析。在分析之前,我们先看一下pkg其他目录的功能。

.
├── common //公共包
├── config //配置包
├── ingress //higress定义的ingress资源控制器
│   ├── config //配置ingress config和kingress config
│   ├── kube //集成Kubernetes配置
│   │   ├── annotations //处理 Ingress 的注解相关的代码
│   │   ├── common //通用的代码和工具函数
│   │   ├── configmap //处理 ConfigMap 相关的代码
│   │   ├── controller //控制器相关的代码
│   │   ├── http2rpc //处理 HTTP 到 RPC 的转换的代码
│   │   ├── ingress //处理 Ingress 配置的代码
│   │   ├── ingressv1 //Ingress 的 API 版本 v1 相关的代码
│   │   ├── kingress //处理 Kingress 相关的代码
│   │   ├── mcpbridge //MCP(Mesh Configuration Protocol)的桥接相关的代码
│   │   ├── secret //处理 Secret 相关的代码
│   │   ├── util //通用的工具函数
│   │   └── wasmplugin //处理 WebAssembly 插件相关的代码
│   ├── log //日志
│   ├── mcp
│   └── translation //翻译模块
└── kube //higress定义的Kubernetes客户端,包含了istio提供的Kubernetes客户端

通过目录我们可以清晰地看到higress集成了多个Kubernetes资源以及Ingress配置。

initConfigController函数剖析

我们先大致分析一下这个函数的流程,函数的具体逻辑稍后分析。最后会给出一个函数调用的流程图,可以结合流程图进行源码的理解分析。

  1. 设置 common.Options 结构体的一些选项,这些选项可能影响配置控制器的行为。这些选项包括是否启用控制器、集群 ID、Ingress 类等
  2. 创建一个 translation.NewIngressTranslation 对象,该对象用于处理 Ingress 的翻译和配置
  3. ingressConfig 中添加本地集群的配置,并获取对应的 Ingress 控制器和 Kingress 控制器
  4. 使用 configaggregate.MakeCache 创建一个带有缓存的配置控制器
  5. 创建一个 Istio 配置存储,并将其设置到 Server 对象的 environment.IstioConfigStore
  6. ingressConfig 设置到 Server 对象的 environment.IngressStore
  7. 将包含配置和启动 Ingress 控制器、Kingress 控制器以及相关的配置控制器的函数方法注册到pilot server instance的components管道中。

我们从中挑选出几个比较重要的函数出来:NewIngressTranslationAddLocalClusterInitializeCluster以及configControllerRun方法,其它的像MakeCacheMakeIstioStore方法则是有istio提供的函数方法,用于对接higress中的discovery容器中Pilot组件。确定好接下来要分析的内容之后,我们来到pkg/ingress/translattion目录,查看一下translation.go

translation.go剖析

IngressTranslation实现了两个接口:model.ConfigStoreCachemodel.IngressStore,其中model.ConfigStoreCache还包含了model.ConfigStore接口。我们可以大致浏览一下,可以发现基本上是调用IngressTranslation结构体中的ingressConfigkingressConfig的方法,位于pkg/ingress/config目录下。

除了实现接口方法之外,还提供了NewIngressTranslationAddLocalClusterInitializeCluster方法。同样地,我们也可以发现这三个方法本质上也是调用ingressConfigkingressConfig的方法。

既然translation.go本质上是调用pkg/ingress/config目录下的函数,不妨我们进入该目录,查看一下ingress_config.go,对于kingress_config.go,我们暂不做分析。

ingress_config.go剖析

ingress_config.go中有一个名为IngressConfig结构体,内容如下(附带注释):

type IngressConfig struct {
// 远程 Ingress 控制器的映射,键为集群 ID,值为 common.IngressController
remoteIngressControllers map[string]common.IngressController
// 用于保护并发访问 remoteIngressControllers 的互斥锁
mutex sync.RWMutex
// Ingress 路由和域名的缓存
ingressRouteCache model.IngressRouteCollection
ingressDomainCache model.IngressDomainCollection
// 本地 Kubernetes 客户端
localKubeClient kube.Client
// 处理 VirtualService 事件的事件处理程序切片
virtualServiceHandlers []model.EventHandler
// 处理 Gateway 事件的事件处理程序切片
gatewayHandlers []model.EventHandler
// 处理 DestinationRule 事件的事件处理程序切片
destinationRuleHandlers []model.EventHandler
// 处理 EnvoyFilter 事件的事件处理程序切片
envoyFilterHandlers []model.EventHandler
// 处理 ServiceEntry 事件的事件处理程序切片
serviceEntryHandlers []model.EventHandler
// 处理 WasmPlugin 事件的事件处理程序切片
wasmPluginHandlers []model.EventHandler
// 用于处理监视错误的处理程序
watchErrorHandler cache.WatchErrorHandler
// 缓存的 EnvoyFilter 配置
cachedEnvoyFilters []config.Config
// 正在监视的 Secret 集合
watchedSecretSet sets.Set
// 用于协调注册表的调解器
RegistryReconciler *reconcile.Reconciler
// MCP 桥接是否已调解的标志
mcpbridgeReconciled *atomic.Bool
// 管理 MCP 桥接相关的控制器和列表
mcpbridgeController mcpbridge.McpBridgeController
mcpbridgeLister netlisterv1.McpBridgeLister
// 管理 WasmPlugin 相关的控制器和列表
wasmPluginController wasmplugin.WasmPluginController
wasmPluginLister extlisterv1.WasmPluginLister
// 已注册的 WasmPlugin 集合,键为 WasmPlugin 名称
wasmPlugins map[string]*extensions.WasmPlugin
// 管理 HTTP2RPC 相关的控制器和列表
http2rpcController http2rpc.Http2RpcController
http2rpcLister netlisterv1.Http2RpcLister
// 已注册的 HTTP2RPC 集合,键为 HTTP2RPC 名称
http2rpcs map[string]*higressv1.Http2Rpc
// Configmap 的管理器
configmapMgr *configmap.ConfigmapMgr
// 用于更新 XDS
XDSUpdater model.XDSUpdater
// 处理注解的注解处理程序
annotationHandler annotations.AnnotationHandler
// 命名空间
namespace string
// 集群 ID
clusterId string
}

该结构体提供了一些方法,其实这个结构体本质上也是实现了translation.go提到的两个接口,方法列表和translation.go极其相似,我们分析一下一些比较复杂的方法,剩下的可自行定位到源码查看:

  • NewIngressConfig

    1. 初始化IngressConfig,创建多个子控制器(mcpbridge、wasmplugin、http2rpc等)并赋值给对应的字段上。
    2. 为多个子控制器注册事件监听器,用于监听Kubernetes资源的变化,并进行相应的处理。
    config := &IngressConfig{
    remoteIngressControllers: make(map[string]common.IngressController),
    localKubeClient: localKubeClient,
    XDSUpdater: XDSUpdater,
    annotationHandler: annotations.NewAnnotationHandlerManager(),
    ...
    }
    mcpbridgeController := mcpbridge.NewController(localKubeClient, clusterId)
    mcpbridgeController.AddEventHandler(config.AddOrUpdateMcpBridge, config.DeleteMcpBridge)
    config.mcpbridgeController = mcpbridgeController
    config.mcpbridgeLister = mcpbridgeController.Lister()
    ...

在这里,有几个方法需要注意:

1. `annotations.NewAnnotationHandlerManager()`
2. `mcpbridge.NewController(localKubeClient, clusterId)`
3. `wasmplugin.NewController(localKubeClient, clusterId)`
4. `http2rpc.NewController(localKubeClient, clusterId)`
5. `configmap.NewController(localKubeClient, clusterId, namespace)`

很明显这些方法分别对应pkg/ingress/kube目录下一些自定义的资源配置方法,后面会针对该目录进行单独的介绍。

  • RegisterEventHandler

    1. 对kind变量进行判断,判断属于哪种资源,并添加到对应的事件处理程序切片
    switch kind {
    case gvk.VirtualService:
    m.virtualServiceHandlers = append(m.virtualServiceHandlers, f)
    ...
    }
    1. 注册事件监听器
    for _, remoteIngressController := range m.remoteIngressControllers {
    remoteIngressController.RegisterEventHandler(kind, f)
    }

    我们可以定位一下remoteIngressControllerRegisterEventHandler方法:

    func (c *controller) RegisterEventHandler(kind config.GroupVersionKind, f model.EventHandler) {
    switch kind {
    case gvk.VirtualService:
    c.virtualServiceHandlers = append(c.virtualServiceHandlers, f)
    ...
    }
    }
  • AddLocalCluster

    1. 创建ingress总控制器,并注册到remoteIngressControllers
    ingressController = ingress.NewController(m.localKubeClient, m.localKubeClient, options, secretController)
    m.remoteIngressControllers[options.ClusterId] = ingressController
  • InitializeCluster

    1. 设置错误监听处理器
    _ = ingressController.SetWatchErrorHandler(m.watchErrorHandler)

    其中SetWatchErrorHandler方法如下,调用各个informer的SetWatchErrorHandler方法:

    func (c *controller) SetWatchErrorHandler(handler func(r *cache.Reflector, err error)) error {
    var errs error
    if err := c.serviceInformer.SetWatchErrorHandler(handler); err != nil {
    errs = multierror.Append(errs, err)
    }
    if err := c.ingressInformer.SetWatchErrorHandler(handler); err != nil {
    errs = multierror.Append(errs, err)
    }
    ...
    return errs
    }
    1. 通过go协程启动ingressController
    go ingressController.Run(stop)
  • List

    1. 前期检查,预防空指针或逻辑错误
    if typ != gvk.Gateway &&
    typ != gvk.VirtualService &&
    typ != gvk.DestinationRule &&
    typ != gvk.EnvoyFilter &&
    typ != gvk.ServiceEntry &&
    typ != gvk.WasmPlugin {
    return nil, common.ErrUnsupportedOp
    }
    // Currently, only support list all namespaces gateways or virtualservices.
    if namespace != "" {
    IngressLog.Warnf("ingress store only support type %s of all namespace.", typ)
    return nil, common.ErrUnsupportedOp
    }
    1. 如果kind类型是envoyfilter,拿出来处理掉
    if typ == gvk.EnvoyFilter {
    ...
    // Build configmap envoy filters
    configmapEnvoyFilters, err := m.configmapMgr.ConstructEnvoyFilters()
    ...
    }
    1. 如果不是,调用ingress控制器(如果有kingress控制器,也给带上)的List方法,添加到config列表,以ingress控制器的方法为例,会获取informer对象的存储器,并对其进行遍历,将遍历到的对象进行深拷贝,并返回config列表。由于List方法是实现ConfigStore接口的,istio内部会对这个config列表进行下一步处理。
    var configs []config.Config
    m.mutex.RLock()
    for _, ingressController := range m.remoteIngressControllers {
    configs = append(configs, ingressController.List()...)
    }
    m.mutex.RUnlock()
    1. 对config列表根据创建时间进行排序,并创建一个包装好的config列表,wrapperConfigs里除了Config``,还会包含AnnotationsConfig`注解配置
    common.SortIngressByCreationTime(configs)
    wrapperConfigs := m.createWrapperConfigs(configs)
    1. 根据kind类型,将wrapperConfigs转换为对应的资源,代码如下,并为convertGateways附加注释
    switch typ {
    case gvk.Gateway:
    //1.初始化ConvertOptions
    //2.调用ingress控制器的ConvertGateway方法,在这里,ingress控制器首先做一些检查,并遍历rule规则列表
    // 为每个rule创建IngressDomainBuilder构造器,中间过程会构造gateway包装器、获取tls密钥名称等等
    // 在这个过程中会对config包装器进行修改
    //3.apply注解
    //4.组合成istio能够识别的config列表
    return m.convertGateways(wrapperConfigs), nil
    case gvk.VirtualService:
    return m.convertVirtualService(wrapperConfigs), nil
    case gvk.DestinationRule:
    return m.convertDestinationRule(wrapperConfigs), nil
    ...
    }

    这块内容较多,具备一定的难度,需要对各个资源有一定的理解。

以下几个方法的作用是极为相似的

  • AddOrUpdateWasmPlugin
  • DeleteWasmPlugin
  • AddOrUpdateMcpBridge
  • DeleteMcpBridge
  • AddOrUpdateHttp2Rpc
  • DeleteHttp2Rpc

顾名思义,就是在监听对应资源的过程中,发生资源的添加或者变更或者删除进行对应的操作。

AddOrUpdateWasmPluginDeleteWasmPlugin为例。

  • AddOrUpdateWasmPlugin

    1. 获取wasmplugin
    wasmPlugin, err := m.wasmPluginLister.WasmPlugins(clusterNamespacedName.Namespace).Get(clusterNamespacedName.Name)
    1. 将wasmplugin转化为istio能够识别的wasmplugin
    istioWasmPlugin, err := m.convertIstioWasmPlugin(&wasmPlugin.Spec)
    1. 更新IngressConfig
    m.wasmPlugins[clusterNamespacedName.Name] = istioWasmPlugin
  • DeleteWasmPlugin

    1. 更新IngressConfig
    delete(m.wasmPlugins, clusterNamespacedName.Name)
    1. 如果存在该wasmplugin,触发wasmPluginHandlers的事件监听器,执行对应的操作
    for _, f := range m.wasmPluginHandlers {
    IngressLog.Debug("WasmPlugin triggerd update")
    f(config.Config{Meta: metadata}, config.Config{Meta: metadata}, model.EventDelete)
    }

到此为此,ingress_config.go的内容基本介绍完毕,大部分内容并未做过多深入,感兴趣的朋友可以自行下载源码浏览查看。

我们现在回到最开始的initConfigController函数上,可以发现这个函数的核心内容其实就是ingress_config.go,当然还有对应的多个控制器,关于控制器的介绍,将在后面进行介绍。

initRegistryEventHandlers函数剖析

该函数相对于initConfigController函数来说简单很多,其主要做的内容就是配置xds更新处理器,并将其作为事件监听器的事件处理程序,注册给configController,也就是说,当发生资源的变更时,会通过xds协议进行推送更新。

pkg/ingress/kube目录介绍

这部分内容较多,后续可自行针对感兴趣的部分进行精读。

  • annotations

    这里主要处理ingress注解的配置,在NewAnnotationHandlerManager方法里列出了一些已经支持的注解,每个注解的处理程序需要实现AnnotationHandler所定义的接口列表。

  • common

    这里核心的文件为controller.go,主要做了一些资源的包装,以及定义了IngressController接口方法,其实现类有pkg/ingress/kube/ingress/controller.gopkg/ingress/kube/ingressv1/controller.go

  • configmap

    这里主要为ConfigMap添加了higress的KV对,在controller.go中定义了ItemController接口,和前者提到的一样,这里也提供了相似的更新配置的方法AddOrUpdateHigressConfig,同样地,也是先获取当前的value值,并获取旧的value值,进行一个比对,比对结果可能为新增、更新、删除,分别执行对应的事件监听器里所定义的事件处理器,也就是XDSUpdater

  • controller

    在前面的介绍中,提到了很多次事件监听器,可能会有人疑问是怎么做资源监听的,资源监听的代码就位于本目录下了。这里可以着重介绍一下,事件监听的具体实现原理。

    1. 定义了Controller[lister any]接口,其实现类为CommonController。 我们可以看一下这个实现结构体有哪些需要注意的字段。

      lister: 这个字段为any类型,会在NewCommonController方法里传入进来,点击查看该方法调用情况,可以发现传入该字段的都是各个资源的监听器,例如ConfigMap的监听器就是client.KubeInformer().Core().V1().ConfigMaps().Lister().ConfigMaps(namespace),其它也是如此。 updateHandler: 这个字段为func(util.ClusterNamespacedName)类型,主要用于存储对应监听器lister所提供的更新或者新增事件处理器,存储代码如下:

      func (c *CommonController[lister]) AddEventHandler(addOrUpdate func(util.ClusterNamespacedName), delete ...func(util.ClusterNamespacedName)) {
      c.updateHandler = addOrUpdate
      if len(delete) > 0 {
      c.removeHandler = delete[0]
      }
      }

      removeHandler: 同updateHandler

    2. 接口方法除了刚刚介绍的AddEventHandler方法,还有个Run方法需要注意一下

      func (c *CommonController[lister]) Run(stop <-chan struct{}) {
      defer utilruntime.HandleCrash()
      defer c.queue.ShutDown()
      if !cache.WaitForCacheSync(stop, c.HasSynced) {
      IngressLog.Errorf("Failed to sync %s controller cache", c.typeName)
      return
      }
      IngressLog.Debugf("%s cache has synced", c.typeName)
      go wait.Until(c.worker, time.Second, stop)
      <-stop
      }

      可以看有个wait.Until方法,用于每秒钟执行c.worker方法,直到接收到stop信号,方法调用链如下:c.worker->c.processNextWorkItem()->c.onEvent(ingressNamespacedName)->c.updateHandler(obj),可以看出来当我们创建一个控制器,并为其注册了一个事件监听器以及事件处理器(updateHandler),因此每秒钟会执行事件处理器,也就实现了监听的逻辑。

    也就是说,这个目录下主要定义了一个公共的控制器,可以节省大量代码的编写,我们在创建一个新的子控制器的时候,可以调用NewCommonController方法来实现监听逻辑。

  • http2rpc

    代码较为简单,通过NewCommonController来创建Http2RpcController,并提供了GetHttp2Rpc方法。

  • ingress

    Ingress控制器的核心代码逻辑,主要为前文提到的ingress_config.go所调用。部分内容在前文已经简单介绍过。

  • ingressv1

    Ingress的API版本v1相关的控制器代码。

  • kingress

    kingress相关的控制器代码。

  • mcpbridge

    http2rpc目录。

  • secret

    http2rpc目录。

  • util

    工具包。

  • wasmplugin

    http2rpc目录。

higress-core流程图

经过前面的源码分析,相信大家对higress核心逻辑已经有了初步的理解,后续可以针对自己感兴趣的部分,进行深入研究,其中会涉及到Kubernetes资源配置的核心逻辑以及Istio的基本掌握,当然,如果给它研究透了,对这些基本原理的理解将会更为深刻。

流程图如下:

img.png

对于初始化配置控制器部分,可进一步绘画流程图。

img1.png

关于事件监听部分,其流程图如下:

img2.png

总结

以上是对higress源码的初步分析,希望能够给用户以及开发者对higress有个基本的了解。