https://github.com/envoyproxy/envoy/blob/master/source/docs/stats.md
- Counter, Gauge, Histogram
- hot restartをサポートするためRPCで親から子へ(以前は共有メモリに置いてて色々面倒だった)
- ThreadLocalで実装するためのThreadLocalStoreで実装
ThreadLocalStore
- スレッドの初期化前にも使えるようになってる
- Scopeはどのスレッドからでも作れるがmainからのみつくってる
- Scopeはどんなスレッドからでも参照、削除できる
- 各スレッドキャッシュを持ち、キャッシュがない場合はcentral cacheニトリに行く
- Scopeが破棄されると、キャッシュフラッシュ操作がすべてのスレッドにポストされ、破棄されたScopeが所有するキャッシュデータをフラッシュします。
- Scopeはキャッシュキーに一意のIDを使用し、古い削除されたScopeのキャッシュを参照しないようにする
- counters(), gauges() が呼ばれる時、Scopeが重複する可能性があるのでde-dupする (?)
Histogram threading
- main thread: ParentHistogram
- per-thread collector: ThreadLocalHistogram
- mainスレッドが全てのworkerにflushメッセージを通知, workerはactiveヒストグラムとbackupヒストグラムをswapする
- 各TLSヒストグラムはbackとforthスワッpする
- workerの処理が終わったらmainはmergeする
- 各ThreadLocalstoreはstat mapを持つ
- cacheのルックアップを安全にするため
[]
オペレーターではなく .findを使え
name
CLUSTER.upstream_cx_connect_attempts_exceeded
みたいな名前でクラスターごとフラットなstringで識別すると、文字列の組合せ爆発が起きる- 代わりにシンボル(symbolized)array。これと文字列との変換には、共有マップ内でのパースや解析をmutexで排他する必要がある
- ランタイムでのcpu負荷を減らすため、Symbolizationはスタートアップ時と、クラスター,ホストなどが変更された時にのみ行う
- StatNamePool, StatNameSetはこれらへのアクセスを便利にする君
- ランタイムで生成される名前もあるのでそれらはStatNameDynamicStorageにいれられる
Symbol Table Memory Layout
- 図参照
API
- systemのapiを抽象化したもの
- thread, process, time, file syste, stats store
- PlatformImpl() thread_factory と fiely_systemの実装
- posixは
Thread::ThreadFactoryImplPosix
Thread::ThreadFactoryImplPosix
に実装されてる
- posixは
Scheduler
- Dispatcher, Scheduler, TypeSystem
- LibEventScheduler: Schedulerをlibeventで実装したもの
- libenevt - epollやkqueue, select等沢山あるイベント監視系のAPIをラップし、シンプルに使えるようにしたイベント監視ライブラリ。マルチプラットフォーム対応
- libevent: event_base_loop()でブロックしてイベントループ実行 (LibEventScheduler::run())
- event_add()でタイマー登録、event_active()で即イベント発火 (TimerImpl)
- event_base が context的な扱いで、各スレッド(workerやmain)に一つずつ
- libenevt - epollやkqueue, select等沢山あるイベント監視系のAPIをラップし、シンプルに使えるようにしたイベント監視ライブラリ。マルチプラットフォーム対応
- DispatcherImpl
- イベント登録
- post(callback) -> キューに追加, runPostCallbacks()で実行される。スレッド間の通信に利用
- createTimer(cb, time) -> libeventのevent_add/active
- run()
- runPostCallbacks() でキューを空に
- LibEventScheduler::run() event_loop
- イベント登録
tls = ThreaLocalImpl
Main: MainCommonBase::run() -> server::run()
- RunHelper: signal listen
- watch dog timer, guard_dogが複数のwatch dogを管理する
- Dispather::run()
- notifyCallbacksForStage()ステージごとにコールバック呼ぶ。
- PostInit: init/ManagerImplからコールバックで呼ばれる。ManagerImplは登録されたfuncを実行してその間のstateを管理するだけ。PostInitの後startworkers()が呼ばれる
- StartUp: run()内でDispatcher::run()の前に呼ばれる
- startWorkers()
- ListnerManagerImpl::start_workers()
- ProdWorkerFactory, workerImpl: workerを作る
- ProdListenerComponentFactory: protoからxDSのネットワークフィルター系を作る, createListenSocket()でTCP/UDPソケット作成
- LibEventScheduler::run() event_loop
- ListnerManagerImpl::start_workers()
ListnereManager::start_workers()
- addListenerToWorker()でworkerにListenerを登録
- WorkerImpl::addListener -> ConnectionHandlerImpl::addListener -> ActiveTcpListner new -> tcpハンドラ登録
- udpはquickとraw
- worker->start()を呼ぶ
- workerImpl::start -> createThread()
ActiveTcpListener
- コンストラクタでdispatcher.createListener() -> ListenImpl::setupServerSocket -> evconnlistener_new でtcpのlistenハンドラ登録
- IoSocketHandlerImplはreadv/writev/sendmsg/recvmsgをラップしたもの。byte arrayの代わりにBuffer::RawSliceを使う
- 接続コールバックではActiveTcpListener::onAccept(AcceptedSocketImpl)へ->onAcceptWorker
- ActiveTcpSocketをnewして(Serverの)createListenerChainしてcontinueFilterChain()
- Network::ListenerFilter::onAccept()をイテレーションして呼ぶ
- 全て成功したらActiveTcpSocket::newConnection() 他のリスナーにリダイレクトする場合(findActiveTcpListenerByAddressで見つかった)はnew_listener.onAccept()にコールバック。それ以外はActiveTcpListener::newConnection()へコールバック
- ActiveTcpListener::newConnection()ではActiveTcpConnectionをactive_connectionsに追加する ここでTCPの初期化だいたいやる
- ActiveTcpConnection = TransportSocket + TcpSocket
- なんかこのコンストラクタでtcpの場合TCP_NODELAYオプションを設定してる。
- TransportSocketを新規に作る
- commonにRawBufferSocketFactory
- extentionsにquicやsslというったものがある
- ConnectionImplに新規のTransportSocketと、先ほど接続されたTcpソケットを渡す。
- dispatcher.createFileEventでReadyのコールバック登録。(file_event_implでlibeventを使いread/writeのイベントハンドラ登録) -> ConnectionImpl::onWriteReady() -> transport_socket->doWrite() -> 各TransportSocket実装へ -> onReadReady() -> transport_socket->doRead() -> 各transport socket実装へ -> onRead() -> filter_manager->onRead
- 同時にtransport_socketにもコールバックを登録。主にfdを渡すだけ(ioHandler()) RawBufferSocket(TransportSocket)::doRead(),doWrite() -> CoonectionImpl::IoHandler()でsocket_からfdを取得しread/write
- fileter_managerは onRead() onWrite()を持つ
- ConncectionImpl::write(), rawWrite(), -> バッファに書いて filter_manager->onWrite()
- config->filterChainFactory->createNetworkFilterChaion() -> ConnectionImplはFilterManagerを実装している。各FilterFactoryがFilterManager::add(Read/Write)Filterしていく
- ActiveTcpConnection = TransportSocket + TcpSocket
FilterManagerImpl
- ActiveReadFilter, ActiveWriteFilterというinner struct
- ReadFilter -> upstream_filters
- WriteFilter -> downstream_filters
- add時に filter実装の setTransportSocketCallbacks をコールバックする
- FilterStatusというものを持っている (Continue, Stopだけ) onWriteでstatusを返す
- onRead
- upstream_filtersをiterateして
- 初期化されてなければ filter->onNewConnectionをコールバック
- read bufferをfilter->onData()にコールバック
- onWrite
- downstream_filtersをiterateして
- write bufferをfilter->onWrite()にコールバック
Buffer::RawSlice
- 何が嬉しい?
ListenerManager
- active_listenersでListnerImpl(ActiveTCPListenerとか)のListを管理
- addOrUpdateListener()でadtive_listneresを登録
- configuration_impl MainImpl::initialize()から呼ばれる
- LDSApiImpl::onConfigUpdate()から呼ばれる
- こちらでは ListnerManager::beginListnerUpdate() -> remove/addOrUpdateListner() -> endListenerUpdate() と操作する(内部のエラー状態をLDS側で取得するため)
- addOrUpdateListener()
- active_listeners, warm_listenersから現在のlistenerインスタンスを取得(existing_warm_listeners, existing_active_listner)
- Configからとったhash値を持ってて、排他に使ってる
- ListnerImplをnewする
- 名前が同じでaddressが違うやつを弾いたりする
- existing_warmにある場合: new listnereで上書きする
- existing_listnerにある場合: warming listenerに追加する(workerがまだ開始してない場合はそのままactive_listner上書き)
- 新規の場合: なんか削除して同じ名前で違うのが追加されたケースとか考慮されてて大変そう。drain_listneresというやつの中に同じアドレスが無いかチェック。あればそのlistnerのsocketを使う。なければアドレス名からSocketFactoryを生成してlitener->SetSockerFactory。こちらも同じくworkerがスタートしてたらwarmingに追加する
- 最後にListnerImplのinitialize()
- InitManager(ManagerImpl)のinitializeを実行する
- active_listeners, warm_listenersから現在のlistenerインスタンスを取得(existing_warm_listeners, existing_active_listner)
- removeListener()
- TODO
- ListnerImplのInitManagerとInitWatcher
- parent(ListnerManager)のonListenerWarmedをWatcherHandlerとして登録
- InitManagerは1つのready()と複数のtarget_handlersを管理できるようだ
- onListenerWarmed()
- addListenerToWorker()->worker.addListener()でワーカーのイベントハンドラを更新、あとはActiveTcpListener newとかへ
- warmedからactive listneresへ移動
WorkerImpl
- start()前にaddListener()->ConnectinoHandlerImpl::addListener()でイベントハンドラが設定される
- start()でスレッドを作成。
- threadRoutine()
- Dispather->run() -> event_base_loop
LDSAPIImpl
- Server InstanceImpl::initialize() 内でListenManager::createLdsAPI()で作成されListenerManagerにそのまま保存
- ListenerComponentFactory::createLDSApi() -> LdsApiImpl
- LdsApiImplコンストラクタでInitManagerのtargetハンドラにsubsription_->start()
- subscriptionとはClusterManagerのイベントをサブスクライブするくんらしい。subscriptionFromConfigSource()の中でConfigに応じてGrpcSubscriptionImplとかのインスタンスを返す。
- onConfigurUPdate: LDS resourceが更新された時のハンドラ listenermangerのaddOrUpdateListnere()へ
GrpcSubscriptionImpl (xDSのgRPC実装)
- GrpcSubscriptionImpl::start() type_urlでどのxDSかを指定
- GrpcMuxImpl::subscribe() api_stateにtype_urlごとにコールバック(watches_)を登録する
- ApiState: type_urlごとにDiscoverRequestの状態を管理している
- GrpcMuxImpl: GrpcStream<DiscoveryRequest, DiscoveryResponse>を持ってる
- start() -> establishNewStream -> onEstablish() -> DiscoverRequestを送信する
- onDisvoeryResponse()
- messageの内容からリソースの差分を見つける
- 初回(今の状態がempty())はcallback_.onConfigUpdate(resource)で全部渡す、存在する奴はMerge()してonConfigUpdate()リソースごとに
- レスポンスの処理が終わったらrequest_.set_version_infoでmessageのバージョンをセーブする、さらに処理に失敗成功関わらず、nonceは保存する。
- またdiscovery request飛ばす
ClusterManager
- TODO
LDS登録からfilter_managerへの追加まで
- MainImpl::initialize(): boostrap.static_resources().listneres()、あるいはLDSAPIから..
- ListenerImplにそのままprotoのconfigが渡る。コンストラクタ内でListnerManager::factory_.createListenereFactoryList()が呼ばれる
- これはServer::createListenerFilterFactoryList -> ProdListenerComponentFactory::createListenerFilterFactoryList_()
- Config::Utility::getAndCheckFactoryConfiguration::NamedListenerFilterConfigFactory(proto_config);
- staticなactoryRegisteryからとってくる。REGISTER_FACTORYマクロで登録
- 例えばHTTPInspectorとか https://www.envoyproxy.io/docs/envoy/latest/configuration/listeners/listener_filters/http_inspector
- createFilterFactoryFromProto内でListnerFilterManagerのaddAcceptFilter()を読んでいる
- HTTPInspectorFilter::Filter().onAccept()が登録される
- ここではsyscall.recvでPEEKしてバッファを少し読み、parseHTTPHeaderでヘッダの中身を解析、statsを更新。
ClusterManager
- RunHelper -> ProdClusterManagerFactory::clusterManagerFromProt()で生成
- ClusterManagerImplコンストラクタ内で
- static_resourcesのcluster -> EDSだったら loadCluster()
- clusterFromProto()->ClusterFactoryImplBase::create() -> ClusterFactoryRegisterにクラスタータイプごとに登録されている
- 例: EDSClusterFactoryImpl -> new EDSClusterImpl
- clusterManager->subscriptionFactory->subscriptionFromConfigSourceで生成したsubscription(LDS APIt同じgRPCのイベントハンドラ)を保持
- EDSClusterImpl::startPreInit()でsubscription->start()する
- onConfigUpdate()
- dynamic_resources.CDS configがあったら -> init_helper_.setCds(cds api)