目录

kubernetes 源码阅读之 client-go 源码结构与客户端对象

client-go 介绍

Kubernetes 系统使用 client-go 作为 go 语言的官方编程式交互客户端库,提供对 Kubernetes API Server 服务的交互访问。开发者常使用 client-go 基于 Kubernetes 做二次开发,所以 client-go 是开发者应熟练掌握的技能。

下面是环境版本等信息

  • 系统:Ubuntu 20.04.1 LTS
  • Kubernetes Branch: release-1.14
  • Golang Version: 1.14.7
  • Goland Version: 2020.2

源码目录

client-go 源码目录如下:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
lee@u:~/go/src/k8s.io/kubernetes/staging/src/k8s.io/client-go$ tree -L 1 -d
.
├── deprecated-dynamic  # 废弃
├── discovery           # 提供 DiscoveryClient 发现客户端
├── dynamic             # 提供 DynamicClient 动态客户端
├── examples            # client-go 使用示例
├── Godeps              # 包含 godep 自动生成的文件,忽略
├── informers           # 每种 Kubernetes 资源的 Informer 实现
├── kubernetes          # 提供 ClientSet 客户端
├── kubernetes_test     # ClientSet 测试
├── listers             # 为每一种 Kubernetes 资源提供 Lister 功能,该功能对 Get 和 List 请求提供只读的缓存数据
├── pkg
├── plugin              # 提供 OpenStack, Azure 等云服务商授权插件
├── rest                # 提供 RESTClient,封装对 Kubernetes API Server 的 RESTful 操作
├── restmapper
├── scale               # 提供 ScaleClient 客户端,用于扩缩容
├── testing
├── third_party
├── tools               # 提供常用工具
├── transport           # 提供安全的 TCP 连接,支持 Http Stream
└── util                # 提供常用方法

客户端对象

在讲客户端之前,我们先来看看初始化客户端需要的配置 rest.Config

客户端配置 rest.Config

staging/src/k8s.io/client-go/rest/config.go

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
// Config holds the common attributes that can be passed to a Kubernetes client on
// initialization.
type Config struct {
	// Host must be a host string, a host:port pair, or a URL to the base of the apiserver.
	// If a URL is given then the (optional) Path of that URL represents a prefix that must
	// be appended to all request URIs used to access the apiserver. This allows a frontend
	// proxy to easily relocate all of the apiserver endpoints.
	Host string
	// APIPath is a sub-path that points to an API root.
	APIPath string

	// ContentConfig contains settings that affect how objects are transformed when
	// sent to the server.
	ContentConfig

	// Server requires Basic authentication
	Username string
	Password string

	// Server requires Bearer authentication. This client will not attempt to use
	// refresh tokens for an OAuth2 flow.
	// TODO: demonstrate an OAuth2 compatible client.
	BearerToken string

	// Path to a file containing a BearerToken.
	// If set, the contents are periodically read.
	// The last successfully read value takes precedence over BearerToken.
	BearerTokenFile string

	// Impersonate is the configuration that RESTClient will use for impersonation.
	Impersonate ImpersonationConfig

	// Server requires plugin-specified authentication.
	AuthProvider *clientcmdapi.AuthProviderConfig

	// Callback to persist config for AuthProvider.
	AuthConfigPersister AuthProviderConfigPersister

	// Exec-based authentication provider.
	ExecProvider *clientcmdapi.ExecConfig

	// TLSClientConfig contains settings to enable transport layer security
	TLSClientConfig

	// UserAgent is an optional field that specifies the caller of this request.
	UserAgent string

	// Transport may be used for custom HTTP behavior. This attribute may not
	// be specified with the TLS client certificate options. Use WrapTransport
	// to provide additional per-server middleware behavior.
	Transport http.RoundTripper
	// WrapTransport will be invoked for custom HTTP behavior after the underlying
	// transport is initialized (either the transport created from TLSClientConfig,
	// Transport, or http.DefaultTransport). The config may layer other RoundTrippers
	// on top of the returned RoundTripper.
	//
	// A future release will change this field to an array. Use config.Wrap()
	// instead of setting this value directly.
	WrapTransport transport.WrapperFunc

	// QPS indicates the maximum QPS to the master from this client.
	// If it's zero, the created RESTClient will use DefaultQPS: 5
	QPS float32

	// Maximum burst for throttle.
	// If it's zero, the created RESTClient will use DefaultBurst: 10.
	Burst int

	// Rate limiter for limiting connections to the master from this client. If present overwrites QPS/Burst
	RateLimiter flowcontrol.RateLimiter

	// The maximum length of time to wait before giving up on a server request. A value of zero means no timeout.
	Timeout time.Duration

	// Dial specifies the dial function for creating unencrypted TCP connections.
	Dial func(ctx context.Context, network, address string) (net.Conn, error)

	// Version forces a specific version to be used (if registered)
	// Do we need this?
	// Version string
}

可以调用 BuildConfigFromFlags 函数获取 rest.Config 对象

staging/src/k8s.io/client-go/tools/clientcmd/client_config.go

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
// BuildConfigFromFlags is a helper function that builds configs from a master
// url or a kubeconfig filepath. These are passed in as command line flags for cluster
// components. Warnings should reflect this usage. If neither masterUrl or kubeconfigPath
// are passed in we fallback to inClusterConfig. If inClusterConfig fails, we fallback
// to the default config.
func BuildConfigFromFlags(masterUrl, kubeconfigPath string) (*restclient.Config, error) {
	if kubeconfigPath == "" && masterUrl == "" {
		klog.Warningf("Neither --kubeconfig nor --master was specified.  Using the inClusterConfig.  This might not work.")
		kubeconfig, err := restclient.InClusterConfig()
		if err == nil {
			return kubeconfig, nil
		}
		klog.Warning("error creating inClusterConfig, falling back to default config: ", err)
	}
	return NewNonInteractiveDeferredLoadingClientConfig(
		&ClientConfigLoadingRules{ExplicitPath: kubeconfigPath},
		&ConfigOverrides{ClusterInfo: clientcmdapi.Cluster{Server: masterUrl}}).ClientConfig()
}

如果 masterUrl, kubeconfigPath 都不传,则会使用集群内的配置 InClusterConfig

staging/src/k8s.io/client-go/rest/config.go

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
// InClusterConfig returns a config object which uses the service account
// kubernetes gives to pods. It's intended for clients that expect to be
// running inside a pod running on kubernetes. It will return ErrNotInCluster
// if called from a process not running in a kubernetes environment.
func InClusterConfig() (*Config, error) {
	const (
		tokenFile  = "/var/run/secrets/kubernetes.io/serviceaccount/token"
		rootCAFile = "/var/run/secrets/kubernetes.io/serviceaccount/ca.crt"
	)
	host, port := os.Getenv("KUBERNETES_SERVICE_HOST"), os.Getenv("KUBERNETES_SERVICE_PORT")
	if len(host) == 0 || len(port) == 0 {
		return nil, ErrNotInCluster
	}

	token, err := ioutil.ReadFile(tokenFile)
	if err != nil {
		return nil, err
	}

	tlsClientConfig := TLSClientConfig{}

	if _, err := certutil.NewPool(rootCAFile); err != nil {
		klog.Errorf("Expected to load root CA config from %s, but got err: %v", rootCAFile, err)
	} else {
		tlsClientConfig.CAFile = rootCAFile
	}

	return &Config{
		// TODO: switch to using cluster DNS.
		Host:            "https://" + net.JoinHostPort(host, port),
		TLSClientConfig: tlsClientConfig,
		BearerToken:     string(token),
		BearerTokenFile: tokenFile,
	}, nil
}

否则的话,通过 ClientConfig 获取 rest.Config。下面详细解释。

ClientConfig 接口

staging/src/k8s.io/client-go/tools/clientcmd/client_config.go

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
// ClientConfig is used to make it easy to get an api server client
type ClientConfig interface {
	// RawConfig returns the merged result of all overrides
	RawConfig() (clientcmdapi.Config, error)
	// ClientConfig returns a complete client config
	ClientConfig() (*restclient.Config, error)
	// Namespace returns the namespace resulting from the merged
	// result of all overrides and a boolean indicating if it was
	// overridden
	Namespace() (string, bool, error)
	// ConfigAccess returns the rules for loading/persisting the config.
	ConfigAccess() ConfigAccess
}

DeferredLoadingClientConfig, DirectClientConfig, inClusterClientConfig 三个结构体实现了 ClientConfig 接口。

获取的逻辑

前面 BuildConfigFromFlags 函数最后调用 NewNonInteractiveDeferredLoadingClientConfig 函数,实际上返回的是 DeferredLoadingClientConfig 对象。

staging/src/k8s.io/client-go/tools/clientcmd/merged_client_builder.go

1
2
3
4
// NewNonInteractiveDeferredLoadingClientConfig creates a ConfigClientClientConfig using the passed context name
func NewNonInteractiveDeferredLoadingClientConfig(loader ClientConfigLoader, overrides *ConfigOverrides) ClientConfig {
	return &DeferredLoadingClientConfig{loader: loader, overrides: overrides, icc: &inClusterClientConfig{overrides: overrides}}
}

获取 rest.Config 的核心逻辑在 DeferredLoadingClientConfigClientConfig 方法中。

staging/src/k8s.io/client-go/tools/clientcmd/merged_client_builder.go

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
// ClientConfig implements ClientConfig
func (config *DeferredLoadingClientConfig) ClientConfig() (*restclient.Config, error) {
	mergedClientConfig, err := config.createClientConfig()
	if err != nil {
		return nil, err
	}

	// load the configuration and return on non-empty errors and if the
	// content differs from the default config
	mergedConfig, err := mergedClientConfig.ClientConfig()
	switch {
	case err != nil:
		if !IsEmptyConfig(err) {
			// return on any error except empty config
			return nil, err
		}
	case mergedConfig != nil:
		// the configuration is valid, but if this is equal to the defaults we should try
		// in-cluster configuration
		if !config.loader.IsDefaultConfig(mergedConfig) {
			return mergedConfig, nil
		}
	}

	// check for in-cluster configuration and use it
	if config.icc.Possible() {
		klog.V(4).Infof("Using in-cluster configuration")
		return config.icc.ClientConfig()
	}

	// return the result of the merged client config
	return mergedConfig, err
}

不难发现,config.createClientConfig() 创建了一个满足 ClientConfig 接口的对象,然后调用该对象的 ClientConfig() 方法获取到的 rest.Config 对象。所以核心逻辑是在 mergedClientConfig 对象的 ClientConfig 方法里。 下面我们看看 createClientConfig 方法。

staging/src/k8s.io/client-go/tools/clientcmd/merged_client_builder.go

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
func (config *DeferredLoadingClientConfig) createClientConfig() (ClientConfig, error) {
	if config.clientConfig == nil {
		config.loadingLock.Lock()
		defer config.loadingLock.Unlock()

		if config.clientConfig == nil {
			mergedConfig, err := config.loader.Load()
			if err != nil {
				return nil, err
			}

			var mergedClientConfig ClientConfig
			if config.fallbackReader != nil {
				mergedClientConfig = NewInteractiveClientConfig(*mergedConfig, config.overrides.CurrentContext, config.overrides, config.fallbackReader, config.loader)
			} else {
				mergedClientConfig = NewNonInteractiveClientConfig(*mergedConfig, config.overrides.CurrentContext, config.overrides, config.loader)
			}

			config.clientConfig = mergedClientConfig
		}
	}

	return config.clientConfig, nil
}

核心逻辑:

  1. 调用 config.loader.Load() 通过 ClientConfigLoadingRules 对象的 Load 方法加载合并配置,返回 clientcmdapi.Config
  2. 调用 NewInteractiveClientConfigNewNonInteractiveClientConfig 方法,返回 DirectClientConfig 对象。

所以上面 mergedClientConfig.ClientConfig() 实际上调用的是 DirectClientConfig 对象的 ClientConfig() 方法,源码如下:

staging/src/k8s.io/client-go/tools/clientcmd/client_config.go

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
// ClientConfig implements ClientConfig
func (config *DirectClientConfig) ClientConfig() (*restclient.Config, error) {
	// check that getAuthInfo, getContext, and getCluster do not return an error.
	// Do this before checking if the current config is usable in the event that an
	// AuthInfo, Context, or Cluster config with user-defined names are not found.
	// This provides a user with the immediate cause for error if one is found
	configAuthInfo, err := config.getAuthInfo()
	if err != nil {
		return nil, err
	}

	_, err = config.getContext()
	if err != nil {
		return nil, err
	}

	configClusterInfo, err := config.getCluster()
	if err != nil {
		return nil, err
	}

	if err := config.ConfirmUsable(); err != nil {
		return nil, err
	}

	clientConfig := &restclient.Config{}
	clientConfig.Host = configClusterInfo.Server

	if len(config.overrides.Timeout) > 0 {
		timeout, err := ParseTimeout(config.overrides.Timeout)
		if err != nil {
			return nil, err
		}
		clientConfig.Timeout = timeout
	}

	if u, err := url.ParseRequestURI(clientConfig.Host); err == nil && u.Opaque == "" && len(u.Path) > 1 {
		u.RawQuery = ""
		u.Fragment = ""
		clientConfig.Host = u.String()
	}
	if len(configAuthInfo.Impersonate) > 0 {
		clientConfig.Impersonate = restclient.ImpersonationConfig{
			UserName: configAuthInfo.Impersonate,
			Groups:   configAuthInfo.ImpersonateGroups,
			Extra:    configAuthInfo.ImpersonateUserExtra,
		}
	}

	// only try to read the auth information if we are secure
	if restclient.IsConfigTransportTLS(*clientConfig) {
		var err error
		var persister restclient.AuthProviderConfigPersister
		if config.configAccess != nil {
			authInfoName, _ := config.getAuthInfoName()
			persister = PersisterForUser(config.configAccess, authInfoName)
		}
		userAuthPartialConfig, err := config.getUserIdentificationPartialConfig(configAuthInfo, config.fallbackReader, persister)
		if err != nil {
			return nil, err
		}
		mergo.MergeWithOverwrite(clientConfig, userAuthPartialConfig)

		serverAuthPartialConfig, err := getServerIdentificationPartialConfig(configAuthInfo, configClusterInfo)
		if err != nil {
			return nil, err
		}
		mergo.MergeWithOverwrite(clientConfig, serverAuthPartialConfig)
	}

	return clientConfig, nil
}

可以看到,该函数内部初始化了 rest.Config 对象,并返回。

RESTClient

RESTClient 是最基础的客户端。其他的客户端都是基于 RESTClient 实现的。RESTClient 对 HTTP Request 进行了封装,实现了 RESTful 风格的 API。它具有很高的灵活性,数据不依赖于方法和资源,因此 RESTClient 能够处理多种类型的调用,返回不同的数据格式。

RESTClient 结构

https://cdn.lixianyang.xyz/github/RESTClient-Tree.webp

有 3 个函数可以用来创建 RESTClient 对象,后面两个都是调用 NewRESTClient 来创建的。

  1. NewRESTClient(…) (*RESTClient, error)
  2. RESTClientFor(config *Config) (*RESTClient, error)
  3. UnversionedRESTClientFor(config *Config) (*RESTClient, error)

Get, POST, PATCH 等方法会调用 Verb 方法,返回的都是 Request 对象。理解 Request 对象是理解 RESTClient 的关键。

Request 结构

Request 允许以链式方式构建到服务器的请求。最终通过 Do 方法调用。

staging/src/k8s.io/client-go/rest/request.go

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
// Do formats and executes the request. Returns a Result object for easy response
// processing.
//
// Error type:
//  * If the request can't be constructed, or an error happened earlier while building its
//    arguments: *RequestConstructionError
//  * If the server responds with a status: *errors.StatusError or *errors.UnexpectedObjectError
//  * http.Client.Do errors are returned directly.
func (r *Request) Do() Result {
	r.tryThrottle()

	var result Result
	err := r.request(func(req *http.Request, resp *http.Response) {
		result = r.transformResponse(resp, req)
	})
	if err != nil {
		return Result{err: err}
	}
	return result
}

Do 方法调用了 request 方法

staging/src/k8s.io/client-go/rest/request.go

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
// request connects to the server and invokes the provided function when a server response is
// received. It handles retry behavior and up front validation of requests. It will invoke
// fn at most once. It will return an error if a problem occurred prior to connecting to the
// server - the provided function is responsible for handling server errors.
func (r *Request) request(fn func(*http.Request, *http.Response)) error {
	//Metrics for total request latency
	start := time.Now()
	defer func() {
		metrics.RequestLatency.Observe(r.verb, r.finalURLTemplate(), time.Since(start))
	}()

	if r.err != nil {
		klog.V(4).Infof("Error in request: %v", r.err)
		return r.err
	}

	// TODO: added to catch programmer errors (invoking operations with an object with an empty namespace)
	if (r.verb == "GET" || r.verb == "PUT" || r.verb == "DELETE") && r.namespaceSet && len(r.resourceName) > 0 && len(r.namespace) == 0 {
		return fmt.Errorf("an empty namespace may not be set when a resource name is provided")
	}
	if (r.verb == "POST") && r.namespaceSet && len(r.namespace) == 0 {
		return fmt.Errorf("an empty namespace may not be set during creation")
	}

	client := r.client
	if client == nil {
		client = http.DefaultClient
	}

	// Right now we make about ten retry attempts if we get a Retry-After response.
	maxRetries := 10
	retries := 0
	for {
		url := r.URL().String()
		req, err := http.NewRequest(r.verb, url, r.body)
		if err != nil {
			return err
		}
		if r.timeout > 0 {
			if r.ctx == nil {
				r.ctx = context.Background()
			}
			var cancelFn context.CancelFunc
			r.ctx, cancelFn = context.WithTimeout(r.ctx, r.timeout)
			defer cancelFn()
		}
		if r.ctx != nil {
			req = req.WithContext(r.ctx)
		}
		req.Header = r.headers

		r.backoffMgr.Sleep(r.backoffMgr.CalculateBackoff(r.URL()))
		if retries > 0 {
			// We are retrying the request that we already send to apiserver
			// at least once before.
			// This request should also be throttled with the client-internal throttler.
			r.tryThrottle()
		}
		resp, err := client.Do(req)
		updateURLMetrics(r, resp, err)
		if err != nil {
			r.backoffMgr.UpdateBackoff(r.URL(), err, 0)
		} else {
			r.backoffMgr.UpdateBackoff(r.URL(), err, resp.StatusCode)
		}
		if err != nil {
			// "Connection reset by peer" is usually a transient error.
			// Thus in case of "GET" operations, we simply retry it.
			// We are not automatically retrying "write" operations, as
			// they are not idempotent.
			if !net.IsConnectionReset(err) || r.verb != "GET" {
				return err
			}
			// For the purpose of retry, we set the artificial "retry-after" response.
			// TODO: Should we clean the original response if it exists?
			resp = &http.Response{
				StatusCode: http.StatusInternalServerError,
				Header:     http.Header{"Retry-After": []string{"1"}},
				Body:       ioutil.NopCloser(bytes.NewReader([]byte{})),
			}
		}

		done := func() bool {
			// Ensure the response body is fully read and closed
			// before we reconnect, so that we reuse the same TCP
			// connection.
			defer func() {
				const maxBodySlurpSize = 2 << 10
				if resp.ContentLength <= maxBodySlurpSize {
					io.Copy(ioutil.Discard, &io.LimitedReader{R: resp.Body, N: maxBodySlurpSize})
				}
				resp.Body.Close()
			}()

			retries++
			if seconds, wait := checkWait(resp); wait && retries < maxRetries {
				if seeker, ok := r.body.(io.Seeker); ok && r.body != nil {
					_, err := seeker.Seek(0, 0)
					if err != nil {
						klog.V(4).Infof("Could not retry request, can't Seek() back to beginning of body for %T", r.body)
						fn(req, resp)
						return true
					}
				}

				klog.V(4).Infof("Got a Retry-After %ds response for attempt %d to %v", seconds, retries, url)
				r.backoffMgr.Sleep(time.Duration(seconds) * time.Second)
				return false
			}
			fn(req, resp)
			return true
		}()
		if done {
			return nil
		}
	}
}

使用示例

下面的代码,列出 kube-system 命名空间下的所有 Pod 资源对象的相关信息。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
package main

import (
	"fmt"
	corev1 "k8s.io/api/core/v1"
	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
	"k8s.io/client-go/kubernetes/scheme"
	"k8s.io/client-go/rest"
	"k8s.io/client-go/tools/clientcmd"
)

func main() {
	config, err := clientcmd.BuildConfigFromFlags("", "/root/.kube/config")
	if err != nil {
		panic(err)
	}
	config.APIPath = "api"
	config.GroupVersion = &corev1.SchemeGroupVersion
	config.NegotiatedSerializer = scheme.Codecs

	restClient, err := rest.RESTClientFor(config)
	if err != nil {
		panic(err)
	}

	result := &corev1.PodList{}

	err = restClient.Get().
		Namespace("kube-system").
		Resource("pods").
		VersionedParams(&metav1.ListOptions{Limit: 500}, scheme.ParameterCodec).
		Do().
		Into(result)
	if err != nil {
		panic(err)
	}

	for _, item := range result.Items {
		fmt.Printf("Namespace:%v \t status:%+v \t Name:%v\n", item.Namespace, item.Status.Phase, item.Name)
	}
}

其中 config.NegotiatedSerializer 是必须字段,主要在下面两个场景用到:

  1. 在创建请求的 NewRequest 方法里配置 Accept Header
  2. Request.transformResponse 方法里用来获取解码器;
  3. Result.Into 方法里解析返回值到具体的对象;

示例中赋值为 scheme.Codecs,关键代码如下:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
// 路径:staging/src/k8s.io/client-go/kubernetes/scheme/register.go
var Codecs = serializer.NewCodecFactory(Scheme)

// 路径:staging/src/k8s.io/apimachinery/pkg/runtime/serializer/codec_factory.go
// NewCodecFactory provides methods for retrieving serializers for the supported wire formats
// and conversion wrappers to define preferred internal and external versions. In the future,
// as the internal version is used less, callers may instead use a defaulting serializer and
// only convert objects which are shared internally (Status, common API machinery).
// TODO: allow other codecs to be compiled in?
// TODO: accept a scheme interface
func NewCodecFactory(scheme *runtime.Scheme) CodecFactory {
	serializers := newSerializersForScheme(scheme, json.DefaultMetaFactory)
	return newCodecFactory(scheme, serializers)
}

// 路径:staging/src/k8s.io/apimachinery/pkg/runtime/serializer/codec_factory.go
func newSerializersForScheme(scheme *runtime.Scheme, mf json.MetaFactory) []serializerType {
	jsonSerializer := json.NewSerializer(mf, scheme, scheme, false)
	jsonPrettySerializer := json.NewSerializer(mf, scheme, scheme, true)
	yamlSerializer := json.NewYAMLSerializer(mf, scheme, scheme)

	serializers := []serializerType{
		{
			AcceptContentTypes: []string{"application/json"},
			ContentType:        "application/json",
			FileExtensions:     []string{"json"},
			EncodesAsText:      true,
			Serializer:         jsonSerializer,
			PrettySerializer:   jsonPrettySerializer,

			Framer:           json.Framer,
			StreamSerializer: jsonSerializer,
		},
		{
			AcceptContentTypes: []string{"application/yaml"},
			ContentType:        "application/yaml",
			FileExtensions:     []string{"yaml"},
			EncodesAsText:      true,
			Serializer:         yamlSerializer,
		},
	}

	for _, fn := range serializerExtensions {
		if serializer, ok := fn(scheme); ok {
			serializers = append(serializers, serializer)
		}
	}
	return serializers
}

ClientSet

RESTClient 是一种最基础的客户端,使用时需要指定 ResourceVersion 等信息,编写代码时需要提前知道 Resource 所在的 Group 和对应的 Version 信息。ClientSetRESTClient 的基础上封装了对 ResourceVersion 的管理方法。每个 ResourceVersion 都以函数的方式暴露给开发者。

注意
ClientSet 仅能访问 Kubernetes 自身内置的资源(即客户端集合内的资源),不能直接访问 CRD 自定义资源。如果需要 ClientSet 访问 CRD 自定义资源,可以通过 client-gen 代码生成器重新生成 ClientSet,在 ClientSet 集合中自动生成与 CRD 操作相关的代码。

创建 client

  1. NewForConfig(c *rest.Config) (*Clientset, error)
  2. func NewForConfigOrDie(c *rest.Config) *Clientset
  3. func New(c rest.Interface) *Clientset

使用示例

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
package main

import (
	"fmt"
	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
	"k8s.io/client-go/kubernetes"
	"k8s.io/client-go/tools/clientcmd"
)

func main() {
	config, err := clientcmd.BuildConfigFromFlags("", "/root/.kube/config")
	if err != nil {
		panic(err)
	}

	clientset := kubernetes.NewForConfigOrDie(config)

	podClient := clientset.CoreV1().Pods("kube-system")

	list, err := podClient.List(metav1.ListOptions{Limit: 500})
	if err != nil {
		panic(err)
	}

	for _, item := range list.Items {
		fmt.Printf("Namespace:%v \t status:%+v \t Name:%v\n", item.Namespace, item.Status.Phase, item.Name)
	}
}

DynamicClient

DynamicClient 是一种动态客户端,可以对任意 Kubernetes 资源进行 RESTful 操作,包括 CRD 自定义资源。DynamicClientClientSet 最大的不同在于,ClientSet 需要预先实现每种 ResourceVersion 的操作,其内部的数据都是结构化数据,而 DynamicClient 内部实现了 Unstructured,用于处理非结构化数据结构。

注意
DynamicClient 不是类型安全的,因此在访问 CRD 自定义资源时需要特别注意。例如,在操作指针不当的情况下可能会导致程序崩溃。

DynamicClient 的处理过程将 Resource(例如 PodList)转换成 Unstructured 结构类型,Kubernetes 的所有 Resource 都可以转换为该结构类型。处理完成后,再将 Unstructured 转换成想要的确切类型。整个过程类似于 Go 语言的 interface{} 断言转换过程。另外,Unstructured 结构类型是通过 map[string]interface{} 转换的。

创建 client

  1. func NewForConfigOrDie(c *rest.Config) Interface
  2. func NewForConfig(inConfig *rest.Config) (Interface, error)

使用示例

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
package main

import (
	"fmt"
	corev1 "k8s.io/api/core/v1"
	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
	"k8s.io/apimachinery/pkg/runtime"
	"k8s.io/apimachinery/pkg/runtime/schema"
	"k8s.io/client-go/dynamic"
	"k8s.io/client-go/tools/clientcmd"
)

func main() {
	config, err := clientcmd.BuildConfigFromFlags("", "/root/.kube/config")
	if err != nil {
		panic(err)
	}

	dynamicclient := dynamic.NewForConfigOrDie(config)

	gvr := schema.GroupVersionResource{Version: "v1", Resource: "pods"}
	unstructObj, err := dynamicclient.Resource(gvr).Namespace("kube-system").List(metav1.ListOptions{Limit: 100})
	if err != nil {
		panic(err)
	}

	list := &corev1.PodList{}
	err = runtime.DefaultUnstructuredConverter.FromUnstructured(unstructObj.UnstructuredContent(), list)
	if err != nil {
		panic(err)
	}

	for _, item := range list.Items {
		fmt.Printf("Namespace:%v \t status:%+v \t Name:%v\n", item.Namespace, item.Status.Phase, item.Name)
	}
}

DiscoveryClient

DiscoveryClient 是发现客户端,它主要用于发现 Kubernetes API Server 所支持的资源组、资源版本、资源信息。

kubectlapi-versionapi-resources 命令输出也是通过 DiscoveryClient 实现的。另外,DiscoveryClient 同样是在 RESTClient 的基础上进行的封装。

ScaleClient

todo