client-go 介绍
Kubernetes 系统使用 client-go
作为 go 语言的官方编程式交互客户端库,提供对 Kubernetes API Server 服务的交互访问。开发者常使用 client-go
基于 Kubernetes 做二次开发,所以 client-go
是开发者应熟练掌握的技能。
下面是环境版本等信息
- 系统:Ubuntu 20.04.1 LTS
- Kubernetes Branch: release-1.14
- Golang Version: 1.14.7
- Goland Version: 2020.2
源码目录
client-go
源码目录如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
|
lee@u:~/go/src/k8s.io/kubernetes/staging/src/k8s.io/client-go$ tree -L 1 -d
.
├── deprecated-dynamic # 废弃
├── discovery # 提供 DiscoveryClient 发现客户端
├── dynamic # 提供 DynamicClient 动态客户端
├── examples # client-go 使用示例
├── Godeps # 包含 godep 自动生成的文件,忽略
├── informers # 每种 Kubernetes 资源的 Informer 实现
├── kubernetes # 提供 ClientSet 客户端
├── kubernetes_test # ClientSet 测试
├── listers # 为每一种 Kubernetes 资源提供 Lister 功能,该功能对 Get 和 List 请求提供只读的缓存数据
├── pkg
├── plugin # 提供 OpenStack, Azure 等云服务商授权插件
├── rest # 提供 RESTClient,封装对 Kubernetes API Server 的 RESTful 操作
├── restmapper
├── scale # 提供 ScaleClient 客户端,用于扩缩容
├── testing
├── third_party
├── tools # 提供常用工具
├── transport # 提供安全的 TCP 连接,支持 Http Stream
└── util # 提供常用方法
|
客户端对象
在讲客户端之前,我们先来看看初始化客户端需要的配置 rest.Config
客户端配置 rest.Config
staging/src/k8s.io/client-go/rest/config.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
|
// Config holds the common attributes that can be passed to a Kubernetes client on
// initialization.
type Config struct {
// Host must be a host string, a host:port pair, or a URL to the base of the apiserver.
// If a URL is given then the (optional) Path of that URL represents a prefix that must
// be appended to all request URIs used to access the apiserver. This allows a frontend
// proxy to easily relocate all of the apiserver endpoints.
Host string
// APIPath is a sub-path that points to an API root.
APIPath string
// ContentConfig contains settings that affect how objects are transformed when
// sent to the server.
ContentConfig
// Server requires Basic authentication
Username string
Password string
// Server requires Bearer authentication. This client will not attempt to use
// refresh tokens for an OAuth2 flow.
// TODO: demonstrate an OAuth2 compatible client.
BearerToken string
// Path to a file containing a BearerToken.
// If set, the contents are periodically read.
// The last successfully read value takes precedence over BearerToken.
BearerTokenFile string
// Impersonate is the configuration that RESTClient will use for impersonation.
Impersonate ImpersonationConfig
// Server requires plugin-specified authentication.
AuthProvider *clientcmdapi.AuthProviderConfig
// Callback to persist config for AuthProvider.
AuthConfigPersister AuthProviderConfigPersister
// Exec-based authentication provider.
ExecProvider *clientcmdapi.ExecConfig
// TLSClientConfig contains settings to enable transport layer security
TLSClientConfig
// UserAgent is an optional field that specifies the caller of this request.
UserAgent string
// Transport may be used for custom HTTP behavior. This attribute may not
// be specified with the TLS client certificate options. Use WrapTransport
// to provide additional per-server middleware behavior.
Transport http.RoundTripper
// WrapTransport will be invoked for custom HTTP behavior after the underlying
// transport is initialized (either the transport created from TLSClientConfig,
// Transport, or http.DefaultTransport). The config may layer other RoundTrippers
// on top of the returned RoundTripper.
//
// A future release will change this field to an array. Use config.Wrap()
// instead of setting this value directly.
WrapTransport transport.WrapperFunc
// QPS indicates the maximum QPS to the master from this client.
// If it's zero, the created RESTClient will use DefaultQPS: 5
QPS float32
// Maximum burst for throttle.
// If it's zero, the created RESTClient will use DefaultBurst: 10.
Burst int
// Rate limiter for limiting connections to the master from this client. If present overwrites QPS/Burst
RateLimiter flowcontrol.RateLimiter
// The maximum length of time to wait before giving up on a server request. A value of zero means no timeout.
Timeout time.Duration
// Dial specifies the dial function for creating unencrypted TCP connections.
Dial func(ctx context.Context, network, address string) (net.Conn, error)
// Version forces a specific version to be used (if registered)
// Do we need this?
// Version string
}
|
可以调用 BuildConfigFromFlags
函数获取 rest.Config
对象
staging/src/k8s.io/client-go/tools/clientcmd/client_config.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
|
// BuildConfigFromFlags is a helper function that builds configs from a master
// url or a kubeconfig filepath. These are passed in as command line flags for cluster
// components. Warnings should reflect this usage. If neither masterUrl or kubeconfigPath
// are passed in we fallback to inClusterConfig. If inClusterConfig fails, we fallback
// to the default config.
func BuildConfigFromFlags(masterUrl, kubeconfigPath string) (*restclient.Config, error) {
if kubeconfigPath == "" && masterUrl == "" {
klog.Warningf("Neither --kubeconfig nor --master was specified. Using the inClusterConfig. This might not work.")
kubeconfig, err := restclient.InClusterConfig()
if err == nil {
return kubeconfig, nil
}
klog.Warning("error creating inClusterConfig, falling back to default config: ", err)
}
return NewNonInteractiveDeferredLoadingClientConfig(
&ClientConfigLoadingRules{ExplicitPath: kubeconfigPath},
&ConfigOverrides{ClusterInfo: clientcmdapi.Cluster{Server: masterUrl}}).ClientConfig()
}
|
如果 masterUrl
, kubeconfigPath
都不传,则会使用集群内的配置 InClusterConfig
staging/src/k8s.io/client-go/rest/config.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
|
// InClusterConfig returns a config object which uses the service account
// kubernetes gives to pods. It's intended for clients that expect to be
// running inside a pod running on kubernetes. It will return ErrNotInCluster
// if called from a process not running in a kubernetes environment.
func InClusterConfig() (*Config, error) {
const (
tokenFile = "/var/run/secrets/kubernetes.io/serviceaccount/token"
rootCAFile = "/var/run/secrets/kubernetes.io/serviceaccount/ca.crt"
)
host, port := os.Getenv("KUBERNETES_SERVICE_HOST"), os.Getenv("KUBERNETES_SERVICE_PORT")
if len(host) == 0 || len(port) == 0 {
return nil, ErrNotInCluster
}
token, err := ioutil.ReadFile(tokenFile)
if err != nil {
return nil, err
}
tlsClientConfig := TLSClientConfig{}
if _, err := certutil.NewPool(rootCAFile); err != nil {
klog.Errorf("Expected to load root CA config from %s, but got err: %v", rootCAFile, err)
} else {
tlsClientConfig.CAFile = rootCAFile
}
return &Config{
// TODO: switch to using cluster DNS.
Host: "https://" + net.JoinHostPort(host, port),
TLSClientConfig: tlsClientConfig,
BearerToken: string(token),
BearerTokenFile: tokenFile,
}, nil
}
|
否则的话,通过 ClientConfig
获取 rest.Config
。下面详细解释。
ClientConfig 接口
staging/src/k8s.io/client-go/tools/clientcmd/client_config.go
1
2
3
4
5
6
7
8
9
10
11
12
13
|
// ClientConfig is used to make it easy to get an api server client
type ClientConfig interface {
// RawConfig returns the merged result of all overrides
RawConfig() (clientcmdapi.Config, error)
// ClientConfig returns a complete client config
ClientConfig() (*restclient.Config, error)
// Namespace returns the namespace resulting from the merged
// result of all overrides and a boolean indicating if it was
// overridden
Namespace() (string, bool, error)
// ConfigAccess returns the rules for loading/persisting the config.
ConfigAccess() ConfigAccess
}
|
DeferredLoadingClientConfig
, DirectClientConfig
, inClusterClientConfig
三个结构体实现了 ClientConfig
接口。
获取的逻辑
前面 BuildConfigFromFlags
函数最后调用 NewNonInteractiveDeferredLoadingClientConfig
函数,实际上返回的是 DeferredLoadingClientConfig
对象。
staging/src/k8s.io/client-go/tools/clientcmd/merged_client_builder.go
1
2
3
4
|
// NewNonInteractiveDeferredLoadingClientConfig creates a ConfigClientClientConfig using the passed context name
func NewNonInteractiveDeferredLoadingClientConfig(loader ClientConfigLoader, overrides *ConfigOverrides) ClientConfig {
return &DeferredLoadingClientConfig{loader: loader, overrides: overrides, icc: &inClusterClientConfig{overrides: overrides}}
}
|
获取 rest.Config
的核心逻辑在 DeferredLoadingClientConfig
的 ClientConfig
方法中。
staging/src/k8s.io/client-go/tools/clientcmd/merged_client_builder.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
|
// ClientConfig implements ClientConfig
func (config *DeferredLoadingClientConfig) ClientConfig() (*restclient.Config, error) {
mergedClientConfig, err := config.createClientConfig()
if err != nil {
return nil, err
}
// load the configuration and return on non-empty errors and if the
// content differs from the default config
mergedConfig, err := mergedClientConfig.ClientConfig()
switch {
case err != nil:
if !IsEmptyConfig(err) {
// return on any error except empty config
return nil, err
}
case mergedConfig != nil:
// the configuration is valid, but if this is equal to the defaults we should try
// in-cluster configuration
if !config.loader.IsDefaultConfig(mergedConfig) {
return mergedConfig, nil
}
}
// check for in-cluster configuration and use it
if config.icc.Possible() {
klog.V(4).Infof("Using in-cluster configuration")
return config.icc.ClientConfig()
}
// return the result of the merged client config
return mergedConfig, err
}
|
不难发现,config.createClientConfig()
创建了一个满足 ClientConfig
接口的对象,然后调用该对象的 ClientConfig()
方法获取到的 rest.Config
对象。所以核心逻辑是在 mergedClientConfig
对象的 ClientConfig
方法里。 下面我们看看 createClientConfig
方法。
staging/src/k8s.io/client-go/tools/clientcmd/merged_client_builder.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
|
func (config *DeferredLoadingClientConfig) createClientConfig() (ClientConfig, error) {
if config.clientConfig == nil {
config.loadingLock.Lock()
defer config.loadingLock.Unlock()
if config.clientConfig == nil {
mergedConfig, err := config.loader.Load()
if err != nil {
return nil, err
}
var mergedClientConfig ClientConfig
if config.fallbackReader != nil {
mergedClientConfig = NewInteractiveClientConfig(*mergedConfig, config.overrides.CurrentContext, config.overrides, config.fallbackReader, config.loader)
} else {
mergedClientConfig = NewNonInteractiveClientConfig(*mergedConfig, config.overrides.CurrentContext, config.overrides, config.loader)
}
config.clientConfig = mergedClientConfig
}
}
return config.clientConfig, nil
}
|
核心逻辑:
- 调用
config.loader.Load()
通过 ClientConfigLoadingRules
对象的 Load 方法加载合并配置,返回 clientcmdapi.Config
。
- 调用
NewInteractiveClientConfig
或 NewNonInteractiveClientConfig
方法,返回 DirectClientConfig
对象。
所以上面 mergedClientConfig.ClientConfig()
实际上调用的是 DirectClientConfig
对象的 ClientConfig()
方法,源码如下:
staging/src/k8s.io/client-go/tools/clientcmd/client_config.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
|
// ClientConfig implements ClientConfig
func (config *DirectClientConfig) ClientConfig() (*restclient.Config, error) {
// check that getAuthInfo, getContext, and getCluster do not return an error.
// Do this before checking if the current config is usable in the event that an
// AuthInfo, Context, or Cluster config with user-defined names are not found.
// This provides a user with the immediate cause for error if one is found
configAuthInfo, err := config.getAuthInfo()
if err != nil {
return nil, err
}
_, err = config.getContext()
if err != nil {
return nil, err
}
configClusterInfo, err := config.getCluster()
if err != nil {
return nil, err
}
if err := config.ConfirmUsable(); err != nil {
return nil, err
}
clientConfig := &restclient.Config{}
clientConfig.Host = configClusterInfo.Server
if len(config.overrides.Timeout) > 0 {
timeout, err := ParseTimeout(config.overrides.Timeout)
if err != nil {
return nil, err
}
clientConfig.Timeout = timeout
}
if u, err := url.ParseRequestURI(clientConfig.Host); err == nil && u.Opaque == "" && len(u.Path) > 1 {
u.RawQuery = ""
u.Fragment = ""
clientConfig.Host = u.String()
}
if len(configAuthInfo.Impersonate) > 0 {
clientConfig.Impersonate = restclient.ImpersonationConfig{
UserName: configAuthInfo.Impersonate,
Groups: configAuthInfo.ImpersonateGroups,
Extra: configAuthInfo.ImpersonateUserExtra,
}
}
// only try to read the auth information if we are secure
if restclient.IsConfigTransportTLS(*clientConfig) {
var err error
var persister restclient.AuthProviderConfigPersister
if config.configAccess != nil {
authInfoName, _ := config.getAuthInfoName()
persister = PersisterForUser(config.configAccess, authInfoName)
}
userAuthPartialConfig, err := config.getUserIdentificationPartialConfig(configAuthInfo, config.fallbackReader, persister)
if err != nil {
return nil, err
}
mergo.MergeWithOverwrite(clientConfig, userAuthPartialConfig)
serverAuthPartialConfig, err := getServerIdentificationPartialConfig(configAuthInfo, configClusterInfo)
if err != nil {
return nil, err
}
mergo.MergeWithOverwrite(clientConfig, serverAuthPartialConfig)
}
return clientConfig, nil
}
|
可以看到,该函数内部初始化了 rest.Config
对象,并返回。
RESTClient
RESTClient
是最基础的客户端。其他的客户端都是基于 RESTClient
实现的。RESTClient
对 HTTP Request 进行了封装,实现了 RESTful 风格的 API。它具有很高的灵活性,数据不依赖于方法和资源,因此 RESTClient
能够处理多种类型的调用,返回不同的数据格式。
RESTClient
结构

有 3 个函数可以用来创建 RESTClient
对象,后面两个都是调用 NewRESTClient
来创建的。
- NewRESTClient(…) (*RESTClient, error)
- RESTClientFor(config *Config) (*RESTClient, error)
- UnversionedRESTClientFor(config *Config) (*RESTClient, error)
Get
, POST
, PATCH
等方法会调用 Verb
方法,返回的都是 Request
对象。理解 Request
对象是理解 RESTClient
的关键。
Request
结构
Request
允许以链式方式构建到服务器的请求。最终通过 Do
方法调用。
staging/src/k8s.io/client-go/rest/request.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
|
// Do formats and executes the request. Returns a Result object for easy response
// processing.
//
// Error type:
// * If the request can't be constructed, or an error happened earlier while building its
// arguments: *RequestConstructionError
// * If the server responds with a status: *errors.StatusError or *errors.UnexpectedObjectError
// * http.Client.Do errors are returned directly.
func (r *Request) Do() Result {
r.tryThrottle()
var result Result
err := r.request(func(req *http.Request, resp *http.Response) {
result = r.transformResponse(resp, req)
})
if err != nil {
return Result{err: err}
}
return result
}
|
Do
方法调用了 request
方法
staging/src/k8s.io/client-go/rest/request.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
|
// request connects to the server and invokes the provided function when a server response is
// received. It handles retry behavior and up front validation of requests. It will invoke
// fn at most once. It will return an error if a problem occurred prior to connecting to the
// server - the provided function is responsible for handling server errors.
func (r *Request) request(fn func(*http.Request, *http.Response)) error {
//Metrics for total request latency
start := time.Now()
defer func() {
metrics.RequestLatency.Observe(r.verb, r.finalURLTemplate(), time.Since(start))
}()
if r.err != nil {
klog.V(4).Infof("Error in request: %v", r.err)
return r.err
}
// TODO: added to catch programmer errors (invoking operations with an object with an empty namespace)
if (r.verb == "GET" || r.verb == "PUT" || r.verb == "DELETE") && r.namespaceSet && len(r.resourceName) > 0 && len(r.namespace) == 0 {
return fmt.Errorf("an empty namespace may not be set when a resource name is provided")
}
if (r.verb == "POST") && r.namespaceSet && len(r.namespace) == 0 {
return fmt.Errorf("an empty namespace may not be set during creation")
}
client := r.client
if client == nil {
client = http.DefaultClient
}
// Right now we make about ten retry attempts if we get a Retry-After response.
maxRetries := 10
retries := 0
for {
url := r.URL().String()
req, err := http.NewRequest(r.verb, url, r.body)
if err != nil {
return err
}
if r.timeout > 0 {
if r.ctx == nil {
r.ctx = context.Background()
}
var cancelFn context.CancelFunc
r.ctx, cancelFn = context.WithTimeout(r.ctx, r.timeout)
defer cancelFn()
}
if r.ctx != nil {
req = req.WithContext(r.ctx)
}
req.Header = r.headers
r.backoffMgr.Sleep(r.backoffMgr.CalculateBackoff(r.URL()))
if retries > 0 {
// We are retrying the request that we already send to apiserver
// at least once before.
// This request should also be throttled with the client-internal throttler.
r.tryThrottle()
}
resp, err := client.Do(req)
updateURLMetrics(r, resp, err)
if err != nil {
r.backoffMgr.UpdateBackoff(r.URL(), err, 0)
} else {
r.backoffMgr.UpdateBackoff(r.URL(), err, resp.StatusCode)
}
if err != nil {
// "Connection reset by peer" is usually a transient error.
// Thus in case of "GET" operations, we simply retry it.
// We are not automatically retrying "write" operations, as
// they are not idempotent.
if !net.IsConnectionReset(err) || r.verb != "GET" {
return err
}
// For the purpose of retry, we set the artificial "retry-after" response.
// TODO: Should we clean the original response if it exists?
resp = &http.Response{
StatusCode: http.StatusInternalServerError,
Header: http.Header{"Retry-After": []string{"1"}},
Body: ioutil.NopCloser(bytes.NewReader([]byte{})),
}
}
done := func() bool {
// Ensure the response body is fully read and closed
// before we reconnect, so that we reuse the same TCP
// connection.
defer func() {
const maxBodySlurpSize = 2 << 10
if resp.ContentLength <= maxBodySlurpSize {
io.Copy(ioutil.Discard, &io.LimitedReader{R: resp.Body, N: maxBodySlurpSize})
}
resp.Body.Close()
}()
retries++
if seconds, wait := checkWait(resp); wait && retries < maxRetries {
if seeker, ok := r.body.(io.Seeker); ok && r.body != nil {
_, err := seeker.Seek(0, 0)
if err != nil {
klog.V(4).Infof("Could not retry request, can't Seek() back to beginning of body for %T", r.body)
fn(req, resp)
return true
}
}
klog.V(4).Infof("Got a Retry-After %ds response for attempt %d to %v", seconds, retries, url)
r.backoffMgr.Sleep(time.Duration(seconds) * time.Second)
return false
}
fn(req, resp)
return true
}()
if done {
return nil
}
}
}
|
使用示例
下面的代码,列出 kube-system 命名空间下的所有 Pod 资源对象的相关信息。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
|
package main
import (
"fmt"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
)
func main() {
config, err := clientcmd.BuildConfigFromFlags("", "/root/.kube/config")
if err != nil {
panic(err)
}
config.APIPath = "api"
config.GroupVersion = &corev1.SchemeGroupVersion
config.NegotiatedSerializer = scheme.Codecs
restClient, err := rest.RESTClientFor(config)
if err != nil {
panic(err)
}
result := &corev1.PodList{}
err = restClient.Get().
Namespace("kube-system").
Resource("pods").
VersionedParams(&metav1.ListOptions{Limit: 500}, scheme.ParameterCodec).
Do().
Into(result)
if err != nil {
panic(err)
}
for _, item := range result.Items {
fmt.Printf("Namespace:%v \t status:%+v \t Name:%v\n", item.Namespace, item.Status.Phase, item.Name)
}
}
|
其中 config.NegotiatedSerializer
是必须字段,主要在下面两个场景用到:
- 在创建请求的
NewRequest
方法里配置 Accept Header
;
- 在
Request.transformResponse
方法里用来获取解码器;
- 在
Result.Into
方法里解析返回值到具体的对象;
示例中赋值为 scheme.Codecs
,关键代码如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
|
// 路径:staging/src/k8s.io/client-go/kubernetes/scheme/register.go
var Codecs = serializer.NewCodecFactory(Scheme)
// 路径:staging/src/k8s.io/apimachinery/pkg/runtime/serializer/codec_factory.go
// NewCodecFactory provides methods for retrieving serializers for the supported wire formats
// and conversion wrappers to define preferred internal and external versions. In the future,
// as the internal version is used less, callers may instead use a defaulting serializer and
// only convert objects which are shared internally (Status, common API machinery).
// TODO: allow other codecs to be compiled in?
// TODO: accept a scheme interface
func NewCodecFactory(scheme *runtime.Scheme) CodecFactory {
serializers := newSerializersForScheme(scheme, json.DefaultMetaFactory)
return newCodecFactory(scheme, serializers)
}
// 路径:staging/src/k8s.io/apimachinery/pkg/runtime/serializer/codec_factory.go
func newSerializersForScheme(scheme *runtime.Scheme, mf json.MetaFactory) []serializerType {
jsonSerializer := json.NewSerializer(mf, scheme, scheme, false)
jsonPrettySerializer := json.NewSerializer(mf, scheme, scheme, true)
yamlSerializer := json.NewYAMLSerializer(mf, scheme, scheme)
serializers := []serializerType{
{
AcceptContentTypes: []string{"application/json"},
ContentType: "application/json",
FileExtensions: []string{"json"},
EncodesAsText: true,
Serializer: jsonSerializer,
PrettySerializer: jsonPrettySerializer,
Framer: json.Framer,
StreamSerializer: jsonSerializer,
},
{
AcceptContentTypes: []string{"application/yaml"},
ContentType: "application/yaml",
FileExtensions: []string{"yaml"},
EncodesAsText: true,
Serializer: yamlSerializer,
},
}
for _, fn := range serializerExtensions {
if serializer, ok := fn(scheme); ok {
serializers = append(serializers, serializer)
}
}
return serializers
}
|
ClientSet
RESTClient
是一种最基础的客户端,使用时需要指定 Resource
和 Version
等信息,编写代码时需要提前知道 Resource
所在的 Group
和对应的 Version
信息。ClientSet
在 RESTClient
的基础上封装了对 Resource
和 Version
的管理方法。每个 Resource
和 Version
都以函数的方式暴露给开发者。
注意
ClientSet 仅能访问 Kubernetes 自身内置的资源(即客户端集合内的资源),不能直接访问 CRD 自定义资源。如果需要 ClientSet 访问 CRD 自定义资源,可以通过 client-gen 代码生成器重新生成 ClientSet,在 ClientSet 集合中自动生成与 CRD 操作相关的代码。
创建 client
NewForConfig(c *rest.Config) (*Clientset, error)
func NewForConfigOrDie(c *rest.Config) *Clientset
func New(c rest.Interface) *Clientset
使用示例
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
|
package main
import (
"fmt"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/clientcmd"
)
func main() {
config, err := clientcmd.BuildConfigFromFlags("", "/root/.kube/config")
if err != nil {
panic(err)
}
clientset := kubernetes.NewForConfigOrDie(config)
podClient := clientset.CoreV1().Pods("kube-system")
list, err := podClient.List(metav1.ListOptions{Limit: 500})
if err != nil {
panic(err)
}
for _, item := range list.Items {
fmt.Printf("Namespace:%v \t status:%+v \t Name:%v\n", item.Namespace, item.Status.Phase, item.Name)
}
}
|
DynamicClient
DynamicClient
是一种动态客户端,可以对任意 Kubernetes 资源进行 RESTful 操作,包括 CRD 自定义资源。DynamicClient
与 ClientSet
最大的不同在于,ClientSet
需要预先实现每种 Resource
和 Version
的操作,其内部的数据都是结构化数据,而 DynamicClient
内部实现了 Unstructured
,用于处理非结构化数据结构。
注意
DynamicClient 不是类型安全的,因此在访问 CRD 自定义资源时需要特别注意。例如,在操作指针不当的情况下可能会导致程序崩溃。
DynamicClient
的处理过程将 Resource
(例如 PodList
)转换成 Unstructured
结构类型,Kubernetes 的所有 Resource
都可以转换为该结构类型。处理完成后,再将 Unstructured
转换成想要的确切类型。整个过程类似于 Go 语言的 interface{} 断言转换过程。另外,Unstructured
结构类型是通过 map[string]interface{}
转换的。
创建 client
- func NewForConfigOrDie(c *rest.Config) Interface
- func NewForConfig(inConfig *rest.Config) (Interface, error)
使用示例
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
|
package main
import (
"fmt"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/tools/clientcmd"
)
func main() {
config, err := clientcmd.BuildConfigFromFlags("", "/root/.kube/config")
if err != nil {
panic(err)
}
dynamicclient := dynamic.NewForConfigOrDie(config)
gvr := schema.GroupVersionResource{Version: "v1", Resource: "pods"}
unstructObj, err := dynamicclient.Resource(gvr).Namespace("kube-system").List(metav1.ListOptions{Limit: 100})
if err != nil {
panic(err)
}
list := &corev1.PodList{}
err = runtime.DefaultUnstructuredConverter.FromUnstructured(unstructObj.UnstructuredContent(), list)
if err != nil {
panic(err)
}
for _, item := range list.Items {
fmt.Printf("Namespace:%v \t status:%+v \t Name:%v\n", item.Namespace, item.Status.Phase, item.Name)
}
}
|
DiscoveryClient
DiscoveryClient
是发现客户端,它主要用于发现 Kubernetes API Server 所支持的资源组、资源版本、资源信息。
kubectl
的 api-version
和 api-resources
命令输出也是通过 DiscoveryClient
实现的。另外,DiscoveryClient
同样是在 RESTClient
的基础上进行的封装。
ScaleClient
todo