Skip to content

Instantly share code, notes, and snippets.

@dbellotti
Last active February 17, 2016 01:20
Show Gist options
  • Save dbellotti/105e6a7f02c11d1b0166 to your computer and use it in GitHub Desktop.
Save dbellotti/105e6a7f02c11d1b0166 to your computer and use it in GitHub Desktop.
package main
import (
"bufio"
"fmt"
"os"
"os/exec"
"path"
"runtime"
"sync"
"syscall"
"time"
"github.com/pivotal-golang/lager"
"github.com/vishvananda/netlink"
"github.com/vishvananda/netlink/nl"
"github.com/cloudfoundry-incubator/ducati-daemon/lib/links"
"github.com/cloudfoundry-incubator/ducati-daemon/lib/namespace"
ducati_nl "github.com/cloudfoundry-incubator/ducati-daemon/lib/nl"
)
func main() {
logger := lager.NewLogger("ducati-d")
logger = logger.Session("", lager.Data{"pid": os.Getpid()})
logger.RegisterSink(lager.NewWriterSink(os.Stdout, lager.DEBUG))
for j := 0; j < 10; j++ {
wg := sync.WaitGroup{}
wg.Add(100)
for i := 0; i < 100; i++ {
iter := j*100 + i
commands := []Command{
CreateNamespace{
Namespace: fmt.Sprintf("container-%d", iter),
},
CreateVethPair{
Master: fmt.Sprintf("host-%d", iter),
Slave: fmt.Sprintf("eth%d", iter),
Namespace: fmt.Sprintf("container-%d", iter),
},
MoveInterface{
SourceNamespace: fmt.Sprintf("container-%d", iter),
TargetNamespace: "",
InterfaceName: fmt.Sprintf("host-%d", iter),
},
}
go func() {
runtime.LockOSThread()
logger.Debug("starting", lager.Data{"iter": iter})
err := execute(commands)
if err != nil {
logger.Fatal("execute-failed", err)
}
wg.Done()
}()
}
wg.Wait()
}
fmt.Printf("\n\n\n---- Press Enter ---\n\n\n")
reader := bufio.NewReader(os.Stdin)
reader.ReadLine()
// namespacePath := os.Args[1]
// ns := namespace.NewNamespace(namespacePath)
// misses := make(chan *netlink.Neigh, 100)
// logger.Debug("Creating netlink socket")
// var sock *nl.NetlinkSocket
// ns.Execute(func(_ *os.File) error {
// logger.Debug("Subscribing to socket")
// sock = subscribe(logger)
// return nil
// })
// ch := make(chan struct{}, 1)
// logger.Debug("Monitoring misses", lager.Data{"socket": fmt.Sprintf("%+v", sock)})
// go monitorMisses(logger, misses, sock)
// <-ch
}
type Command interface {
Do() error
}
type CreateNamespace struct {
Namespace string
}
func (c CreateNamespace) Do() error {
return exec.Command("/sbin/ip", "netns", "add", c.Namespace).Run()
}
type CreateVethPair struct {
Master string
Slave string
Namespace string
}
func (c CreateVethPair) Do() error {
ns := namespace.NewNamespace(path.Join("/var/run/netns", c.Namespace))
return ns.Execute(func(_ *os.File) error {
linkFactory := &links.Factory{Netlinker: ducati_nl.Netlink}
_, _, err := linkFactory.CreateVethPair(c.Master, c.Slave, links.VxlanVethMTU)
return err
})
}
type MoveInterface struct {
SourceNamespace string
TargetNamespace string
InterfaceName string
}
func (c MoveInterface) Do() error {
ns := namespace.NewNamespace(path.Join("/var/run/netns", c.SourceNamespace))
file, err := os.Open("/proc/1/ns/net")
if err != nil {
return err
}
defer file.Close()
return ns.Execute(func(_ *os.File) error {
linkFactory := &links.Factory{Netlinker: ducati_nl.Netlink}
link, err := linkFactory.FindLink(c.InterfaceName)
if err != nil {
return err
}
return ducati_nl.Netlink.LinkSetNsFd(link, int(file.Fd()))
})
}
func execute(commands []Command) error {
for _, c := range commands {
if err := c.Do(); err != nil {
return fmt.Errorf("command %+v failed: %s", c, err)
}
}
return nil
}
func subscribe(logger lager.Logger) *nl.NetlinkSocket {
nlsock, err := nl.Subscribe(syscall.NETLINK_ROUTE, syscall.RTNLGRP_NEIGH)
if err != nil {
logger.Error("Failed to subscribe to netlink RTNLGRP_NEIGH messages", err)
return nil
}
return nlsock
}
func monitorMisses(logger lager.Logger, misses chan *netlink.Neigh, nlsock *nl.NetlinkSocket) {
for {
msgs, err := nlsock.Receive()
if err != nil {
logger.Error("Failed to receive from netlink", err)
time.Sleep(1 * time.Second)
continue
}
for _, msg := range msgs {
processNeighMsg(logger, msg, misses)
}
}
}
func isNeighResolving(state int) bool {
return (state & (netlink.NUD_INCOMPLETE | netlink.NUD_STALE | netlink.NUD_DELAY | netlink.NUD_PROBE)) != 0
}
type neigh netlink.Neigh
func (n *neigh) String() string {
var readableState string
if n.State&netlink.NUD_INCOMPLETE != 0 {
readableState = " | " + "INCOMPLETE"
}
if n.State&netlink.NUD_REACHABLE != 0 {
readableState = " | " + "REACHABLE"
}
if n.State&netlink.NUD_STALE != 0 {
readableState = " | " + "STALE"
}
if n.State&netlink.NUD_DELAY != 0 {
readableState = " | " + "DELAY"
}
if n.State&netlink.NUD_PROBE != 0 {
readableState = " | " + "PROBE"
}
if n.State&netlink.NUD_FAILED != 0 {
readableState = " | " + "FAILED"
}
if n.State&netlink.NUD_NOARP != 0 {
readableState = " | " + "NOARP"
}
if n.State&netlink.NUD_PERMANENT != 0 {
readableState = " | " + "PERMANENT"
}
return fmt.Sprintf(
"LinkIndex: %d, Family: %d, State: %s, Type: %d, Flags: %d, IP: %s, HardwareAddr: %s",
int(n.LinkIndex),
int(n.Family),
readableState,
int(n.Type),
int(n.Flags),
n.IP.String(),
n.HardwareAddr.String(),
)
}
func processNeighMsg(logger lager.Logger, msg syscall.NetlinkMessage, misses chan *netlink.Neigh) {
n, err := netlink.NeighDeserialize(msg.Data)
if err != nil {
logger.Error("Failed to deserialize netlink ndmsg", err)
return
}
myNeigh := neigh(*n)
logger.Debug("netlink-neighbor-message", lager.Data{"msg": fmt.Sprintf("%s", myNeigh.String())})
if msg.Header.Type != syscall.RTM_GETNEIGH && msg.Header.Type != syscall.RTM_NEWNEIGH {
return
}
if !isNeighResolving(n.State) {
// misses come with NUD_STALE bit set
return
}
misses <- n
}
package main
import (
"bufio"
"fmt"
"os"
"os/exec"
"path"
"runtime"
"sync"
"syscall"
"time"
"github.com/pivotal-golang/lager"
"github.com/vishvananda/netlink"
"github.com/vishvananda/netlink/nl"
"github.com/cloudfoundry-incubator/ducati-daemon/lib/links"
"github.com/cloudfoundry-incubator/ducati-daemon/lib/namespace"
ducati_nl "github.com/cloudfoundry-incubator/ducati-daemon/lib/nl"
)
func main() {
logger := lager.NewLogger("ducati-d")
logger = logger.Session("", lager.Data{"pid": os.Getpid()})
logger.RegisterSink(lager.NewWriterSink(os.Stdout, lager.DEBUG))
loops := 1000
done := make(chan struct{})
wg := sync.WaitGroup{}
wg.Add(loops)
for i := 0; i < loops; i++ {
iter := i
commands := []Command{
CreateNamespace{
Namespace: fmt.Sprintf("container-%d", iter),
},
CreateVethPair{
Master: fmt.Sprintf("host-%d", iter),
Slave: fmt.Sprintf("eth%d", iter),
Namespace: fmt.Sprintf("container-%d", iter),
},
MoveInterface{
SourceNamespace: fmt.Sprintf("container-%d", iter),
TargetNamespace: "",
InterfaceName: fmt.Sprintf("host-%d", iter),
},
}
go func() {
runtime.LockOSThread()
logger.Debug("starting", lager.Data{"iter": iter})
err := execute(commands)
if err != nil {
logger.Fatal("execute-failed", err)
}
wg.Done()
<-done
}()
}
wg.Wait()
fmt.Printf("\n\n\n---- Press Enter ---\n\n\n")
reader := bufio.NewReader(os.Stdin)
reader.ReadLine()
close(done)
fmt.Printf("\n\n\n---- Press Enter ---\n\n\n")
reader.ReadLine()
// namespacePath := os.Args[1]
// ns := namespace.NewNamespace(namespacePath)
// misses := make(chan *netlink.Neigh, 100)
// logger.Debug("Creating netlink socket")
// var sock *nl.NetlinkSocket
// ns.Execute(func(_ *os.File) error {
// logger.Debug("Subscribing to socket")
// sock = subscribe(logger)
// return nil
// })
// ch := make(chan struct{}, 1)
// logger.Debug("Monitoring misses", lager.Data{"socket": fmt.Sprintf("%+v", sock)})
// go monitorMisses(logger, misses, sock)
// <-ch
}
type Command interface {
Do() error
}
type CreateNamespace struct {
Namespace string
}
func (c CreateNamespace) Do() error {
return exec.Command("/sbin/ip", "netns", "add", c.Namespace).Run()
}
type CreateVethPair struct {
Master string
Slave string
Namespace string
}
func (c CreateVethPair) Do() error {
ns := namespace.NewNamespace(path.Join("/var/run/netns", c.Namespace))
return ns.Execute(func(_ *os.File) error {
linkFactory := &links.Factory{Netlinker: ducati_nl.Netlink}
_, _, err := linkFactory.CreateVethPair(c.Master, c.Slave, links.VxlanVethMTU)
return err
})
}
type MoveInterface struct {
SourceNamespace string
TargetNamespace string
InterfaceName string
}
func (c MoveInterface) Do() error {
ns := namespace.NewNamespace(path.Join("/var/run/netns", c.SourceNamespace))
file, err := os.Open("/proc/1/ns/net")
if err != nil {
return err
}
defer file.Close()
return ns.Execute(func(_ *os.File) error {
linkFactory := &links.Factory{Netlinker: ducati_nl.Netlink}
link, err := linkFactory.FindLink(c.InterfaceName)
if err != nil {
return err
}
return ducati_nl.Netlink.LinkSetNsFd(link, int(file.Fd()))
})
}
func execute(commands []Command) error {
for _, c := range commands {
if err := c.Do(); err != nil {
return fmt.Errorf("command %+v failed: %s", c, err)
}
}
return nil
}
func subscribe(logger lager.Logger) *nl.NetlinkSocket {
nlsock, err := nl.Subscribe(syscall.NETLINK_ROUTE, syscall.RTNLGRP_NEIGH)
if err != nil {
logger.Error("Failed to subscribe to netlink RTNLGRP_NEIGH messages", err)
return nil
}
return nlsock
}
func monitorMisses(logger lager.Logger, misses chan *netlink.Neigh, nlsock *nl.NetlinkSocket) {
for {
msgs, err := nlsock.Receive()
if err != nil {
logger.Error("Failed to receive from netlink", err)
time.Sleep(1 * time.Second)
continue
}
for _, msg := range msgs {
processNeighMsg(logger, msg, misses)
}
}
}
func isNeighResolving(state int) bool {
return (state & (netlink.NUD_INCOMPLETE | netlink.NUD_STALE | netlink.NUD_DELAY | netlink.NUD_PROBE)) != 0
}
type neigh netlink.Neigh
func (n *neigh) String() string {
var readableState string
if n.State&netlink.NUD_INCOMPLETE != 0 {
readableState = " | " + "INCOMPLETE"
}
if n.State&netlink.NUD_REACHABLE != 0 {
readableState = " | " + "REACHABLE"
}
if n.State&netlink.NUD_STALE != 0 {
readableState = " | " + "STALE"
}
if n.State&netlink.NUD_DELAY != 0 {
readableState = " | " + "DELAY"
}
if n.State&netlink.NUD_PROBE != 0 {
readableState = " | " + "PROBE"
}
if n.State&netlink.NUD_FAILED != 0 {
readableState = " | " + "FAILED"
}
if n.State&netlink.NUD_NOARP != 0 {
readableState = " | " + "NOARP"
}
if n.State&netlink.NUD_PERMANENT != 0 {
readableState = " | " + "PERMANENT"
}
return fmt.Sprintf(
"LinkIndex: %d, Family: %d, State: %s, Type: %d, Flags: %d, IP: %s, HardwareAddr: %s",
int(n.LinkIndex),
int(n.Family),
readableState,
int(n.Type),
int(n.Flags),
n.IP.String(),
n.HardwareAddr.String(),
)
}
func processNeighMsg(logger lager.Logger, msg syscall.NetlinkMessage, misses chan *netlink.Neigh) {
n, err := netlink.NeighDeserialize(msg.Data)
if err != nil {
logger.Error("Failed to deserialize netlink ndmsg", err)
return
}
myNeigh := neigh(*n)
logger.Debug("netlink-neighbor-message", lager.Data{"msg": fmt.Sprintf("%s", myNeigh.String())})
if msg.Header.Type != syscall.RTM_GETNEIGH && msg.Header.Type != syscall.RTM_NEWNEIGH {
return
}
if !isNeighResolving(n.State) {
// misses come with NUD_STALE bit set
return
}
misses <- n
}
package main
import (
"fmt"
"os"
"syscall"
"time"
"github.com/pivotal-golang/lager"
"github.com/vishvananda/netlink"
"github.com/vishvananda/netlink/nl"
"github.com/cloudfoundry-incubator/ducati-daemon/lib/namespace"
)
func main() {
logger := lager.NewLogger("ducati-d")
logger = logger.Session("", lager.Data{"pid": os.Getpid()})
logger.RegisterSink(lager.NewWriterSink(os.Stdout, lager.DEBUG))
namespacePath := os.Args[1]
ns := namespace.NewNamespace(namespacePath)
misses := make(chan *netlink.Neigh, 100)
logger.Debug("Creating netlink socket")
var sock *nl.NetlinkSocket
ns.Execute(func(_ *os.File) error {
logger.Debug("Subscribing to socket")
sock = subscribe(logger)
return nil
})
ch := make(chan struct{}, 1)
logger.Debug("Monitoring misses", lager.Data{"socket": fmt.Sprintf("%+v", sock)})
go monitorMisses(logger, misses, sock)
<-ch
}
func subscribe(logger lager.Logger) *nl.NetlinkSocket {
nlsock, err := nl.Subscribe(syscall.NETLINK_ROUTE, syscall.RTNLGRP_NEIGH)
if err != nil {
logger.Error("Failed to subscribe to netlink RTNLGRP_NEIGH messages", err)
return nil
}
return nlsock
}
func monitorMisses(logger lager.Logger, misses chan *netlink.Neigh, nlsock *nl.NetlinkSocket) {
for {
msgs, err := nlsock.Receive()
if err != nil {
logger.Error("Failed to receive from netlink", err)
time.Sleep(1 * time.Second)
continue
}
for _, msg := range msgs {
processNeighMsg(logger, msg, misses)
}
}
}
func isNeighResolving(state int) bool {
return (state & (netlink.NUD_INCOMPLETE | netlink.NUD_STALE | netlink.NUD_DELAY | netlink.NUD_PROBE)) != 0
}
type neigh netlink.Neigh
func (n *neigh) String() string {
var readableState string
if n.State&netlink.NUD_INCOMPLETE != 0 {
readableState = " | " + "INCOMPLETE"
}
if n.State&netlink.NUD_REACHABLE != 0 {
readableState = " | " + "REACHABLE"
}
if n.State&netlink.NUD_STALE != 0 {
readableState = " | " + "STALE"
}
if n.State&netlink.NUD_DELAY != 0 {
readableState = " | " + "DELAY"
}
if n.State&netlink.NUD_PROBE != 0 {
readableState = " | " + "PROBE"
}
if n.State&netlink.NUD_FAILED != 0 {
readableState = " | " + "FAILED"
}
if n.State&netlink.NUD_NOARP != 0 {
readableState = " | " + "NOARP"
}
if n.State&netlink.NUD_PERMANENT != 0 {
readableState = " | " + "PERMANENT"
}
return fmt.Sprintf(
"LinkIndex: %d, Family: %d, State: %s, Type: %d, Flags: %d, IP: %s, HardwareAddr: %s",
int(n.LinkIndex),
int(n.Family),
readableState,
int(n.Type),
int(n.Flags),
n.IP.String(),
n.HardwareAddr.String(),
)
}
func processNeighMsg(logger lager.Logger, msg syscall.NetlinkMessage, misses chan *netlink.Neigh) {
n, err := netlink.NeighDeserialize(msg.Data)
if err != nil {
logger.Error("Failed to deserialize netlink ndmsg", err)
return
}
myNeigh := neigh(*n)
logger.Debug("netlink-neighbor-message", lager.Data{"msg": fmt.Sprintf("%s", myNeigh.String())})
if msg.Header.Type != syscall.RTM_GETNEIGH && msg.Header.Type != syscall.RTM_NEWNEIGH {
return
}
if !isNeighResolving(n.State) {
// misses come with NUD_STALE bit set
return
}
misses <- n
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment