Last active
April 12, 2024 02:56
-
-
Save yin1999/7d168865300b5dfe374b0791152cff2c to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
import ( | |
"encoding/binary" | |
"fmt" | |
"log" | |
stdlog "log" | |
"math/rand" | |
"net" | |
"os" | |
"strings" | |
// import the resources to register them | |
_ "github.com/envoyproxy/go-control-plane/envoy/config/cluster/v3" | |
_ "github.com/envoyproxy/go-control-plane/envoy/config/endpoint/v3" | |
_ "github.com/envoyproxy/go-control-plane/envoy/config/listener/v3" | |
_ "github.com/envoyproxy/go-control-plane/envoy/extensions/filters/network/tcp_proxy/v3" | |
"github.com/apache/dubbo-kubernetes/api/mesh/v1alpha1" | |
"github.com/apache/dubbo-kubernetes/pkg/core" | |
"github.com/apache/dubbo-kubernetes/pkg/core/resources/model/rest/unversioned" | |
envoy_resource "github.com/envoyproxy/go-control-plane/pkg/resource/v3" | |
"github.com/pkg/errors" | |
"github.com/spf13/cobra" | |
"go.uber.org/multierr" | |
"google.golang.org/protobuf/encoding/protojson" | |
"google.golang.org/protobuf/proto" | |
"google.golang.org/protobuf/reflect/protoregistry" | |
"google.golang.org/protobuf/types/known/anypb" | |
rest_v1alpha1 "github.com/apache/dubbo-kubernetes/pkg/core/resources/model/rest/v1alpha1" | |
dubbo_log "github.com/apache/dubbo-kubernetes/pkg/log" | |
"github.com/apache/dubbo-kubernetes/tools/xds-client/stream" | |
) | |
func newRootCmd() *cobra.Command { | |
cmd := &cobra.Command{ | |
Use: "dubbo-xds-client", | |
Short: "dubbo xDS client", | |
Long: `dubbo xDS client.`, | |
PersistentPreRun: func(_ *cobra.Command, _ []string) { | |
core.SetLogger(core.NewLogger(dubbo_log.DebugLevel)) | |
}, | |
} | |
// sub-commands | |
cmd.AddCommand(newRunCmd()) | |
return cmd | |
} | |
func newRunCmd() *cobra.Command { | |
log := core.Log.WithName("dubbo-xds-client").WithName("run") | |
args := struct { | |
xdsServerAddress string | |
dps int | |
services int | |
inbounds int | |
outbounds int | |
}{ | |
xdsServerAddress: "grpc://localhost:5678", | |
dps: 1, | |
services: 1, | |
inbounds: 1, | |
outbounds: 1, | |
} | |
cmd := &cobra.Command{ | |
Use: "run", | |
Short: "Start xDS client(s) that simulate Envoy", | |
Long: `Start xDS client(s) that simulate Envoy.`, | |
RunE: func(cmd *cobra.Command, _ []string) error { | |
ipRand := rand.Uint32() // #nosec G404 -- that's just a test tool | |
log.Info("going to start xDS clients (Envoy simulators)", "dps", args.dps) | |
errCh := make(chan error, 1) | |
for i := 0; i < args.dps; i++ { | |
namespace := "dubbo-system" | |
name := "dubbo-samples-apiserver-consumer-59c775d8fd-wbdz8" | |
mesh := "default" | |
name = name + "." + namespace | |
id := mesh + "." + name | |
nodeLog := log.WithName("envoy-simulator").WithValues("idx", i, "ID", id) | |
nodeLog.Info("creating an xDS client ...") | |
go func(i int) { | |
buf := make([]byte, 4) | |
binary.LittleEndian.PutUint32(buf, ipRand+uint32(i)) | |
ip := net.IP(buf).String() | |
dpSpec := &v1alpha1.Dataplane{ | |
Networking: &v1alpha1.Dataplane_Networking{ | |
Address: ip, | |
}, | |
} | |
dp := &unversioned.Resource{ | |
Meta: rest_v1alpha1.ResourceMeta{Mesh: mesh, Name: name, Type: "Dataplane"}, | |
Spec: dpSpec, | |
} | |
errCh <- func() (errs error) { | |
client, err := stream.New(args.xdsServerAddress) | |
if err != nil { | |
return errors.Wrap(err, "failed to connect to xDS server") | |
} | |
defer func() { | |
nodeLog.Info("closing a connection ...") | |
if err := client.Close(); err != nil { | |
errs = multierr.Append(errs, errors.Wrapf(err, "failed to close a connection")) | |
} | |
}() | |
nodeLog.Info("opening an xDS stream ...") | |
stream, err := client.StartStream() | |
if err != nil { | |
return errors.Wrap(err, "failed to start an xDS stream") | |
} | |
defer func() { | |
nodeLog.Info("closing an xDS stream ...") | |
if err := stream.Close(); err != nil { | |
errs = multierr.Append(errs, errors.Wrapf(err, "failed to close an xDS stream")) | |
} | |
}() | |
nodeLog.Info("requesting Listeners") | |
e := stream.Request(id, envoy_resource.ListenerType, dp) | |
if e != nil { | |
return errors.Wrapf(e, "failed to request %q", envoy_resource.ListenerType) | |
} | |
nodeLog.Info("requesting Clusters") | |
e = stream.Request(id, envoy_resource.ClusterType, dp) | |
if e != nil { | |
return errors.Wrapf(e, "failed to request %q", envoy_resource.ClusterType) | |
} | |
nodeLog.Info("requesting Endpoints") | |
e = stream.Request(id, envoy_resource.EndpointType, dp) | |
if e != nil { | |
return errors.Wrapf(e, "failed to request %q", envoy_resource.EndpointType) | |
} | |
// nodeLog.Info("endpoint resources:\n", formatResources(dp.GetSpec().)) | |
for { | |
nodeLog.Info("waiting for a discovery response ...") | |
resp, err := stream.WaitForResources() | |
if err != nil { | |
return errors.Wrap(err, "failed to receive a discovery response") | |
} | |
stdlog.Print("received xDS resources ", " type ", resp.TypeUrl, " version ", resp.VersionInfo, " nonce ", resp.Nonce, " resources ", len(resp.Resources), " resources: ", formatResources(resp.Resources)) | |
if err := stream.ACK(resp.TypeUrl); err != nil { | |
return errors.Wrap(err, "failed to ACK a discovery response") | |
} | |
nodeLog.Info("ACKed discovery response", "type", resp.TypeUrl, "version", resp.VersionInfo, "nonce", resp.Nonce) | |
} | |
}() | |
}(i) | |
} | |
err := <-errCh | |
return errors.Wrap(err, "one of xDS clients (Envoy simulators) terminated with an error") | |
}, | |
} | |
// flags | |
cmd.PersistentFlags().StringVar(&args.xdsServerAddress, "xds-server-address", args.xdsServerAddress, "address of xDS server") | |
cmd.PersistentFlags().IntVar(&args.dps, "dps", args.dps, "number of dataplanes to emulate") | |
cmd.PersistentFlags().IntVar(&args.services, "services", args.services, "number of services") | |
cmd.PersistentFlags().IntVar(&args.inbounds, "inbounds", args.inbounds, "number of inbounds") | |
cmd.PersistentFlags().IntVar(&args.outbounds, "outbounds", args.outbounds, "number of outbounds") | |
return cmd | |
} | |
func formatResources(resources []*anypb.Any) string { | |
result := make([]string, 0, len(resources)) | |
for _, r := range resources { | |
// check the type of the resource is registered | |
tp, err := protoregistry.GlobalTypes.FindMessageByURL(r.GetTypeUrl()) | |
if err != nil { | |
log.Printf("failed to find the type of the resource (%s): %v", r.GetTypeUrl(), err) | |
continue | |
} | |
// unmarshal the resource | |
msg := tp.New().Interface() | |
if err := anypb.UnmarshalTo(r, msg, proto.UnmarshalOptions{}); err != nil { | |
log.Printf("failed to unmarshal the resource: %v", err) | |
continue | |
} | |
// format the resource | |
result = append(result, protojson.Format(msg)) | |
} | |
return strings.Join(result, " | ") | |
} | |
func main() { | |
if err := newRootCmd().Execute(); err != nil { | |
fmt.Println(err) | |
os.Exit(1) | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment