Handle failover scenarios (#320)
This commit is contained in:
Родитель
6095bb33e6
Коммит
ea70592862
44
npm/npm.go
44
npm/npm.go
|
@ -10,6 +10,7 @@ import (
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/Azure/azure-container-networking/log"
|
"github.com/Azure/azure-container-networking/log"
|
||||||
|
"github.com/Azure/azure-container-networking/npm/iptm"
|
||||||
"github.com/Azure/azure-container-networking/npm/util"
|
"github.com/Azure/azure-container-networking/npm/util"
|
||||||
"github.com/Azure/azure-container-networking/telemetry"
|
"github.com/Azure/azure-container-networking/telemetry"
|
||||||
corev1 "k8s.io/api/core/v1"
|
corev1 "k8s.io/api/core/v1"
|
||||||
|
@ -27,6 +28,9 @@ const (
|
||||||
hostNetAgentURLForNpm = "http://168.63.129.16/machine/plugins?comp=netagent&type=npmreport"
|
hostNetAgentURLForNpm = "http://168.63.129.16/machine/plugins?comp=netagent&type=npmreport"
|
||||||
contentType = "application/json"
|
contentType = "application/json"
|
||||||
telemetryRetryWaitTimeInSeconds = 60
|
telemetryRetryWaitTimeInSeconds = 60
|
||||||
|
restoreRetryWaitTimeInSeconds = 5
|
||||||
|
restoreMaxRetries = 10
|
||||||
|
backupWaitTimeInSeconds = 60
|
||||||
)
|
)
|
||||||
|
|
||||||
// NetworkPolicyManager contains informers for pod, namespace and networkpolicy.
|
// NetworkPolicyManager contains informers for pod, namespace and networkpolicy.
|
||||||
|
@ -101,11 +105,45 @@ func (npMgr *NetworkPolicyManager) UpdateAndSendReport(err error, eventMsg strin
|
||||||
return npMgr.reportManager.SendReport(telemetryBuffer)
|
return npMgr.reportManager.SendReport(telemetryBuffer)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Run starts shared informers and waits for the shared informer cache to sync.
|
// restore restores iptables from backup file
|
||||||
func (npMgr *NetworkPolicyManager) Run(stopCh <-chan struct{}) error {
|
func (npMgr *NetworkPolicyManager) restore() {
|
||||||
|
iptMgr := iptm.NewIptablesManager()
|
||||||
|
var err error
|
||||||
|
for i := 0; i < restoreMaxRetries; i++ {
|
||||||
|
if err = iptMgr.Restore(util.IptablesConfigFile); err == nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
time.Sleep(restoreRetryWaitTimeInSeconds * time.Second)
|
||||||
|
}
|
||||||
|
|
||||||
|
log.Printf("Timeout restoring Azure-NPM states")
|
||||||
|
panic(err.Error)
|
||||||
|
}
|
||||||
|
|
||||||
|
// backup takes snapshots of iptables filter table and saves it periodically.
|
||||||
|
func (npMgr *NetworkPolicyManager) backup() {
|
||||||
|
iptMgr := iptm.NewIptablesManager()
|
||||||
|
var err error
|
||||||
|
for {
|
||||||
|
time.Sleep(backupWaitTimeInSeconds * time.Second)
|
||||||
|
|
||||||
|
if err = iptMgr.Save(util.IptablesConfigFile); err != nil {
|
||||||
|
log.Printf("Error backing up Azure-NPM states")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Start starts shared informers and waits for the shared informer cache to sync.
|
||||||
|
func (npMgr *NetworkPolicyManager) Start(stopCh <-chan struct{}) error {
|
||||||
// Starts all informers manufactured by npMgr's informerFactory.
|
// Starts all informers manufactured by npMgr's informerFactory.
|
||||||
npMgr.informerFactory.Start(stopCh)
|
npMgr.informerFactory.Start(stopCh)
|
||||||
|
|
||||||
|
// Failure detected. Needs to restore Azure-NPM related iptables entries.
|
||||||
|
if util.Exists(util.IptablesConfigFile) {
|
||||||
|
npMgr.restore()
|
||||||
|
}
|
||||||
|
|
||||||
// Wait for the initial sync of local cache.
|
// Wait for the initial sync of local cache.
|
||||||
if !cache.WaitForCacheSync(stopCh, npMgr.podInformer.Informer().HasSynced) {
|
if !cache.WaitForCacheSync(stopCh, npMgr.podInformer.Informer().HasSynced) {
|
||||||
return fmt.Errorf("Pod informer failed to sync")
|
return fmt.Errorf("Pod informer failed to sync")
|
||||||
|
@ -119,6 +157,8 @@ func (npMgr *NetworkPolicyManager) Run(stopCh <-chan struct{}) error {
|
||||||
return fmt.Errorf("Namespace informer failed to sync")
|
return fmt.Errorf("Namespace informer failed to sync")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
go npMgr.backup()
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -57,7 +57,7 @@ func main() {
|
||||||
factory := informers.NewSharedInformerFactory(clientset, time.Hour*24)
|
factory := informers.NewSharedInformerFactory(clientset, time.Hour*24)
|
||||||
|
|
||||||
npMgr := npm.NewNetworkPolicyManager(clientset, factory, version)
|
npMgr := npm.NewNetworkPolicyManager(clientset, factory, version)
|
||||||
err = npMgr.Run(wait.NeverStop)
|
err = npMgr.Start(wait.NeverStop)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Printf("[Azure-NPM] npm failed with error %v.", err)
|
log.Printf("[Azure-NPM] npm failed with error %v.", err)
|
||||||
panic(err.Error)
|
panic(err.Error)
|
||||||
|
|
|
@ -5,6 +5,7 @@ package util
|
||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
"hash/fnv"
|
"hash/fnv"
|
||||||
|
"os"
|
||||||
"strconv"
|
"strconv"
|
||||||
"strings"
|
"strings"
|
||||||
|
|
||||||
|
@ -14,6 +15,17 @@ import (
|
||||||
// IsNewNwPolicyVerFlag indicates if the current kubernetes version is newer than 1.11 or not
|
// IsNewNwPolicyVerFlag indicates if the current kubernetes version is newer than 1.11 or not
|
||||||
var IsNewNwPolicyVerFlag = false
|
var IsNewNwPolicyVerFlag = false
|
||||||
|
|
||||||
|
// Exists reports whether the named file or directory exists.
|
||||||
|
func Exists(filePath string) bool {
|
||||||
|
if _, err := os.Stat(filePath); err == nil {
|
||||||
|
return true
|
||||||
|
} else if !os.IsNotExist(err) {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
// GetClusterID retrieves cluster ID through node name. (Azure-specific)
|
// GetClusterID retrieves cluster ID through node name. (Azure-specific)
|
||||||
func GetClusterID(nodeName string) string {
|
func GetClusterID(nodeName string) string {
|
||||||
s := strings.Split(nodeName, "-")
|
s := strings.Split(nodeName, "-")
|
||||||
|
|
Загрузка…
Ссылка в новой задаче