從 Kubernetes 學習 Go 接口封裝
在 Go 項目開發中,為了提高代碼的可讀性、可維護性和可測試性,合理的封裝和抽象至關重要。本文將介紹幾種常見的封裝策略,包括使用接口隱藏輸入參數細節、方便 Mock 測試的接口抽象、多種底層實現的接口封裝,以及對協程異常處理、WaitGroup 使用和基于信號量觸發邏輯的封裝實踐。通過這些技巧,可以讓各層代碼只關注自身職責,實現低耦合、高復用的設計。
使用接口隱藏輸入參數細節
當一個方法的輸入參數是結構體時,內部調用會暴露過多的細節。此時,可以將輸入隱式轉換為接口,使內部實現僅能看到所需的方法。
type Kubelet struct{}
func (kl *Kubelet) HandlePodAdditions(pods []*Pod) {
for _, pod := range pods {
fmt.Printf("create pods : %s\n", pod.Status)
}
}
func (kl *Kubelet) Run(updates <-chan Pod) {
fmt.Println(" run kubelet")
go kl.syncLoop(updates, kl)
}
func (kl *Kubelet) syncLoop(updates <-chan Pod, handler SyncHandler) {
for {
select {
case pod := <-updates:
handler.HandlePodAdditions([]*Pod{&pod})
}
}
}
type SyncHandler interface {
HandlePodAdditions(pods []*Pod)
}
在這里,Kubelet 本身有多個方法:
- syncLoop:用于狀態同步的循環;
- Run:啟動監聽循環;
- HandlePodAdditions:處理 Pod 添加的邏輯。
由于 syncLoop 并不需要訪問 kubelet 的其他方法,我們定義了 SyncHandler 接口,讓 kubelet 實現該接口,并將 kubelet 作為 SyncHandler 傳入 syncLoop,這樣 kubelet 會被類型轉換為 SyncHandler。
轉換后,syncLoop 的參數中將不再暴露 kubelet 的其他方法,使你在編寫 syncLoop 時更專注于內部邏輯。
但這種做法也可能帶來問題:初始抽象可能能滿足第一版需求,但隨著需求增長,如果需要在 syncLoop 中調用接口未包含的 kubelet 方法,就必須要么顯式傳入 kubelet,要么擴展接口,這兩種方式都會增加編碼成本并破壞原有封裝。
分層封裝與隱藏是設計目標,讓代碼的每一部分只關注自身職責。
便于 Mock 測試的接口封裝
通過接口抽象,我們可以在測試時直接實例化 mock 結構體,用于無需關注的部分。
type OrderAPI interface {
GetOrderId() string
}
type realOrderImpl struct{}
func (r *realOrderImpl) GetOrderId() string {
return ""
}
type mockOrderImpl struct{}
func (m *mockOrderImpl) GetOrderId() string {
return "mock"
}
這里如果在測試時不關心 GetOrderId 是否正常工作,就可以直接用 mockOrderImpl 初始化 OrderAPI,且 mock 中的邏輯可根據需要任意復雜化。
func TestGetOrderId(t *testing.T) {
orderAPI := &mockOrderImpl{} // 如果我們需要獲取訂單 ID,但這不是測試的重點,只需用 mock 結構體初始化
fmt.Println(orderAPI.GetOrderId())
}
gomonkey 也可以用于測試注入,因此即使現有代碼沒有通過接口封裝,我們仍能實現 mock,而且這種方式更為強大。
patches := gomonkey.ApplyFunc(GetOrder, func(orderId string) Order {
return Order{
OrderId: orderId,
OrderState: delivering,
}
})
return func() {
patches.Reset()
}
使用 gomonkey 可以實現更靈活的 mock,因為它可以直接設置函數的返回值,而接口抽象只能處理由結構體實例化的內容。
多種底層實現的接口封裝
像 iptables 和 ipvs 這樣的實現是通過接口抽象來完成的,因為所有網絡設置都需要同時處理 Service 和 Endpoint。因此,它們抽象出了 ServiceHandler 和 EndpointSliceHandler:
// ServiceHandler 是用于接收 Service 對象變更通知的抽象接口
type ServiceHandler interface {
// 當檢測到新的 Service 對象被創建時調用
OnServiceAdd(service *v1.Service)
// 當檢測到已有 Service 對象被修改時調用
OnServiceUpdate(oldService, service *v1.Service)
// 當檢測到已有 Service 對象被刪除時調用
OnServiceDelete(service *v1.Service)
// 當所有初始事件處理完成且狀態已完全同步到本地緩存后調用
OnServiceSynced()
}
// EndpointSliceHandler 是用于接收 EndpointSlice 對象變更通知的抽象接口
type EndpointSliceHandler interface {
// 當檢測到新的 EndpointSlice 對象被創建時調用
OnEndpointSliceAdd(endpointSlice *discoveryv1.EndpointSlice)
// 當檢測到已有 EndpointSlice 對象被修改時調用
OnEndpointSliceUpdate(oldEndpointSlice, newEndpointSlice *discoveryv1.EndpointSlice)
// 當檢測到已有 EndpointSlice 對象被刪除時調用
OnEndpointSliceDelete(endpointSlice *discoveryv1.EndpointSlice)
// 當所有初始事件處理完成且狀態已完全同步到本地緩存后調用
OnEndpointSlicesSynced()
}
然后可以通過 Provider 注入:
type Provider interface {
config.EndpointSliceHandler
config.ServiceHandler
}
這也是我在編寫組件時最常用的編碼技巧:通過對相似操作的抽象,上層代碼在替換底層實現后無需做任何改動。
封裝異常處理
如果我們在啟動 goroutine 后不捕獲異常,異常會導致該 goroutine 直接 panic。但是每次都寫全局的 recover 邏輯并不優雅,因此我們可以使用封裝好的 HandleCrash 方法:
package runtime
var (
ReallyCrash = true
)
// 默認的全局 Panic 處理器
var PanicHandlers = []func(interface{}){logPanic}
// 支持從外部傳入額外的自定義 panic 處理器
func HandleCrash(additionalHandlers ...func(interface{})) {
if r := recover(); r != nil {
for _, fn := range PanicHandlers {
fn(r)
}
for _, fn := range additionalHandlers {
fn(r)
}
if ReallyCrash {
panic(r)
}
}
}
這既支持內部異常處理,也支持外部注入額外的處理器。如果不想讓程序崩潰,可以根據需要修改邏輯。
package runtime
func Go(fn func()) {
go func() {
defer HandleCrash()
fn()
}()
}
在啟動 goroutine 時,可以使用 Go 方法,這樣也能避免忘記添加 panic 處理。
封裝 WaitGroup
import "sync"
type Group struct {
wg sync.WaitGroup
}
func (g *Group) Wait() {
g.wg.Wait()
}
func (g *Group) Start(f func()) {
g.wg.Add(1)
go func() {
defer g.wg.Done()
f()
}()
}
這里最重要的是 Start 方法,它在內部封裝了 Add 和 Done。雖然只有幾行代碼,但它確保每次使用 WaitGroup 時,都不會忘記增加或完成計數器。
封裝由信號量觸發的邏輯
type BoundedFrequencyRunner struct {
sync.Mutex
// Actively triggered
run chan struct{}
// Timer limit
timer *time.Timer
// The actual logic to execute
fn func()
}
func NewBoundedFrequencyRunner(fn func()) *BoundedFrequencyRunner {
return &BoundedFrequencyRunner{
run: make(chan struct{}, 1),
fn: fn,
timer: time.NewTimer(0),
}
}
// Run triggers execution; only one signal can be written here, additional signals are discarded without blocking. You can increase the queue size as needed.
func (b *BoundedFrequencyRunner) Run() {
select {
case b.run <- struct{}{}:
fmt.Println("Signal written successfully")
default:
fmt.Println("Signal already triggered once, discarding")
}
}
func (b *BoundedFrequencyRunner) Loop() {
b.timer.Reset(time.Second * 1)
for {
select {
case <-b.run:
fmt.Println("Run signal triggered")
b.tryRun()
case <-b.timer.C:
fmt.Println("Timer triggered execution")
b.tryRun()
}
}
}
func (b *BoundedFrequencyRunner) tryRun() {
b.Lock()
defer b.Unlock()
// You can add logic here such as rate limiting
b.timer.Reset(time.Second * 1)
b.fn()
}