386 lines
11 KiB
Go
386 lines
11 KiB
Go
package main
|
|
|
|
import (
|
|
"fmt"
|
|
"net"
|
|
"os"
|
|
"path"
|
|
"reflect"
|
|
_ "runtime/debug"
|
|
"time"
|
|
|
|
log "github.com/sirupsen/logrus"
|
|
"golang.org/x/net/context"
|
|
"google.golang.org/grpc"
|
|
pluginapi "k8s.io/kubelet/pkg/apis/deviceplugin/v1beta1"
|
|
)
|
|
|
|
const (
|
|
resourceNamePrefix = "deviceplugindemo/"
|
|
serverSockPath = pluginapi.DevicePluginPath
|
|
// AWS_SN = "F1-Node"
|
|
)
|
|
|
|
// FPGADevicePluginServer implements the Kubernetes device plugin API
|
|
type FPGADevicePluginServer struct {
|
|
devType string
|
|
devices map[string]Device
|
|
socket string
|
|
stop chan interface{}
|
|
update chan map[string]Device
|
|
|
|
server *grpc.Server
|
|
}
|
|
|
|
type FPGADevicePlugin struct {
|
|
devices map[string]map[string]Device
|
|
servers map[string]*FPGADevicePluginServer
|
|
updateChan chan map[string]map[string]Device
|
|
}
|
|
|
|
// NewFPGADevicePlugin returns an initialized FPGADevicePlugin
|
|
func NewFPGADevicePlugin() *FPGADevicePlugin {
|
|
log.Debugf("create FPGA device plugin")
|
|
updateChan := make(chan map[string]map[string]Device)
|
|
plugin := FPGADevicePlugin{
|
|
devices: make(map[string]map[string]Device),
|
|
servers: make(map[string]*FPGADevicePluginServer),
|
|
updateChan: updateChan,
|
|
}
|
|
|
|
go func() {
|
|
for {
|
|
devices, err := GetDevices()
|
|
if err != nil {
|
|
time.Sleep(75 * time.Second)
|
|
devices, err = GetDevices()
|
|
if err != nil {
|
|
log.Errorf("Error to get FPGA devices: %v", err)
|
|
break
|
|
}
|
|
}
|
|
devMap := make(map[string]map[string]Device)
|
|
for _, device := range devices {
|
|
|
|
DSAtype := device.index
|
|
id := device.deviceID
|
|
if subMap, ok := devMap[DSAtype]; ok {
|
|
subMap = devMap[DSAtype]
|
|
subMap[id] = device
|
|
} else {
|
|
subMap = make(map[string]Device)
|
|
devMap[DSAtype] = subMap
|
|
subMap[id] = device
|
|
}
|
|
}
|
|
updateChan <- devMap
|
|
time.Sleep(5 * time.Second)
|
|
}
|
|
close(updateChan)
|
|
}()
|
|
|
|
return &plugin
|
|
}
|
|
|
|
func (m *FPGADevicePlugin) checkDeviceUpdate(n map[string]map[string]Device) {
|
|
added := make(map[string]map[string]Device)
|
|
updated := make(map[string]map[string]Device)
|
|
removed := make(map[string]map[string]Device)
|
|
|
|
for oDevType, oDevices := range m.devices {
|
|
if nDevices, ok := n[oDevType]; ok {
|
|
if !reflect.DeepEqual(oDevices, nDevices) {
|
|
updated[oDevType] = nDevices
|
|
}
|
|
delete(n, oDevType)
|
|
} else {
|
|
removed[oDevType] = oDevices
|
|
}
|
|
}
|
|
for nDevType, nDevices := range n {
|
|
added[nDevType] = nDevices
|
|
}
|
|
|
|
//create new server for added devices
|
|
for aDevType, aDevices := range added {
|
|
devicePluginServer := m.NewFPGADevicePluginServer(aDevType, aDevices)
|
|
m.devices[aDevType] = aDevices
|
|
m.servers[aDevType] = devicePluginServer
|
|
go func(aDevType string, aDevices map[string]Device, name string) {
|
|
if err := m.servers[aDevType].Serve(name); err != nil {
|
|
log.Println("Could not contact Kubelet, Exit. Did you enable the device plugin feature gate?")
|
|
os.Exit(1)
|
|
}
|
|
m.servers[aDevType].update <- aDevices
|
|
}(aDevType, aDevices, resourceNamePrefix+aDevType)
|
|
}
|
|
|
|
//stop server for removed devices
|
|
for rDevType, rDevices := range removed {
|
|
log.Debugf("Remove device %v", rDevices)
|
|
m.servers[rDevType].Stop()
|
|
delete(m.servers, rDevType)
|
|
delete(m.devices, rDevType)
|
|
}
|
|
|
|
//send update for updated devices
|
|
for uDevType, uDevices := range updated {
|
|
m.devices[uDevType] = uDevices
|
|
m.servers[uDevType].update <- uDevices
|
|
}
|
|
}
|
|
|
|
// NewFPGADevicePluginServer returns an initialized FPGADevicePluginServer
|
|
func (m *FPGADevicePlugin) NewFPGADevicePluginServer(devType string, devices map[string]Device) *FPGADevicePluginServer {
|
|
return &FPGADevicePluginServer{
|
|
devType: devType,
|
|
devices: devices,
|
|
socket: path.Join(serverSockPath, devType+"-demodevice.sock"),
|
|
stop: make(chan interface{}),
|
|
update: make(chan map[string]Device, 1),
|
|
}
|
|
}
|
|
|
|
// waitForServer checks if grpc server is alive
|
|
// by making grpc blocking connection to the server socket
|
|
func waitForServer(socket string, timeout time.Duration) error {
|
|
ctx, cancel := context.WithTimeout(context.Background(), timeout)
|
|
defer cancel()
|
|
conn, err := grpc.DialContext(ctx, socket, grpc.WithInsecure(), grpc.WithBlock(),
|
|
grpc.WithDialer(func(addr string, timeout time.Duration) (net.Conn, error) {
|
|
return net.DialTimeout("unix", addr, timeout)
|
|
}),
|
|
)
|
|
if conn != nil {
|
|
conn.Close()
|
|
}
|
|
|
|
if err != nil {
|
|
fmt.Errorf("Failed dial context at %s", socket)
|
|
return err
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (m *FPGADevicePluginServer) deviceExists(id string) bool {
|
|
for k, _ := range m.devices {
|
|
if k == id {
|
|
return true
|
|
}
|
|
}
|
|
return false
|
|
}
|
|
|
|
func (m *FPGADevicePluginServer) PreStartContainer(ctx context.Context, rqt *pluginapi.PreStartContainerRequest) (*pluginapi.PreStartContainerResponse, error) {
|
|
return nil, fmt.Errorf("PreStartContainer() should not be called")
|
|
}
|
|
|
|
func (m *FPGADevicePluginServer) GetDevicePluginOptions(ctx context.Context, empty *pluginapi.Empty) (*pluginapi.DevicePluginOptions, error) {
|
|
fmt.Println("GetDevicePluginOptions: return empty options")
|
|
return new(pluginapi.DevicePluginOptions), nil
|
|
}
|
|
|
|
// Start starts the gRPC server of the device plugin
|
|
func (m *FPGADevicePluginServer) Start() error {
|
|
err := m.cleanup()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
sock, err := net.Listen("unix", m.socket)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
m.server = grpc.NewServer()
|
|
pluginapi.RegisterDevicePluginServer(m.server, m)
|
|
|
|
go m.server.Serve(sock)
|
|
|
|
// Wait for the server to start
|
|
if err = waitForServer(m.socket, 10*time.Second); err != nil {
|
|
return err
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// Stop stops the gRPC server
|
|
func (m *FPGADevicePluginServer) Stop() error {
|
|
if m.server == nil {
|
|
return nil
|
|
}
|
|
|
|
m.server.Stop()
|
|
m.server = nil
|
|
close(m.stop)
|
|
close(m.update)
|
|
|
|
return m.cleanup()
|
|
}
|
|
|
|
func (m *FPGADevicePluginServer) cleanup() error {
|
|
if err := os.Remove(m.socket); err != nil && !os.IsNotExist(err) {
|
|
return err
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// Register registers the device plugin for the given resourceName with Kubelet.
|
|
func (m *FPGADevicePluginServer) Register(kubeletEndpoint, resourceName string) error {
|
|
conn, err := grpc.Dial(kubeletEndpoint, grpc.WithInsecure(),
|
|
grpc.WithDialer(func(addr string, timeout time.Duration) (net.Conn, error) {
|
|
return net.DialTimeout("unix", addr, timeout)
|
|
}))
|
|
|
|
if err != nil {
|
|
log.Debugf("Cann't connect to kubelet service")
|
|
return err
|
|
}
|
|
defer conn.Close()
|
|
|
|
client := pluginapi.NewRegistrationClient(conn)
|
|
reqt := &pluginapi.RegisterRequest{
|
|
Version: pluginapi.Version,
|
|
Endpoint: path.Base(m.socket),
|
|
ResourceName: resourceName,
|
|
}
|
|
|
|
_, err = client.Register(context.Background(), reqt)
|
|
if err != nil {
|
|
log.Debugf("Cann't register to kubelet service")
|
|
return err
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// func IsContain(items []string, item string) bool {
|
|
// AWS_SN := "F1-Node"
|
|
// for _, eachItem := range items {
|
|
// if eachItem == item && strings.EqualFold(item, AWS_SN) != true {
|
|
// return true
|
|
// }
|
|
// }
|
|
// return false
|
|
// }
|
|
func (m *FPGADevicePluginServer) sendDevices(s pluginapi.DevicePlugin_ListAndWatchServer) error {
|
|
resp := new(pluginapi.ListAndWatchResponse)
|
|
|
|
check_range := m.devices
|
|
SerialNums := []string{}
|
|
for _, device := range check_range {
|
|
if device.SN == "" {
|
|
log.Printf("Error, Device %v has empty Serial number", device.deviceID)
|
|
} else {
|
|
SerialNums = append(SerialNums, device.SN)
|
|
tem := &pluginapi.Device{
|
|
ID: device.deviceID,
|
|
Health: device.Healthy,
|
|
}
|
|
resp.Devices = append(resp.Devices, tem)
|
|
}
|
|
}
|
|
log.Printf("Check SeialNums arry: %v", SerialNums)
|
|
log.Printf("Sending %d device(s) %v to kubelet", len(resp.Devices), resp.Devices)
|
|
if err := s.Send(resp); err != nil {
|
|
m.Stop()
|
|
log.Debugf("Cannot update device list")
|
|
return err
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// ListAndWatch lists devices and update that list according to the health status
|
|
func (m *FPGADevicePluginServer) ListAndWatch(e *pluginapi.Empty, s pluginapi.DevicePlugin_ListAndWatchServer) error {
|
|
log.Debugf("In ListAndWatch(%s): stream: %v", m.devType, s)
|
|
//debug.PrintStack()
|
|
for m.devices = range m.update {
|
|
if err := m.sendDevices(s); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// Allocate which return list of devices.
|
|
func (m *FPGADevicePluginServer) Allocate(ctx context.Context, req *pluginapi.AllocateRequest) (*pluginapi.AllocateResponse, error) {
|
|
log.Debugf("In Allocate()")
|
|
response := new(pluginapi.AllocateResponse)
|
|
for _, creq := range req.ContainerRequests {
|
|
log.Debugf("Request IDs: %v", creq.DevicesIDs)
|
|
|
|
cres := new(pluginapi.ContainerAllocateResponse)
|
|
|
|
// Check same serial number devices, devices with same serail number "F1-node" will be marked as independent devices
|
|
deviceIDs_arry := creq.DevicesIDs
|
|
|
|
for _, id := range deviceIDs_arry {
|
|
log.Printf("Receiving request %s", id)
|
|
dev, ok := m.devices[id]
|
|
if !ok {
|
|
return nil, fmt.Errorf("Invalid allocation request with non-existing device %s", id)
|
|
}
|
|
if !m.deviceExists(id) {
|
|
return nil, fmt.Errorf("invalid allocation request: unknown device: %s", id)
|
|
}
|
|
fname := path.Join(SysfsDevices, dev.deviceID, DeviceFile)
|
|
cres.Mounts = append(cres.Mounts, &pluginapi.Mount{
|
|
HostPath: fname,
|
|
ContainerPath: fname,
|
|
ReadOnly: false,
|
|
})
|
|
response.ContainerResponses = append(response.ContainerResponses, cres)
|
|
}
|
|
}
|
|
return response, nil
|
|
}
|
|
|
|
// Serve starts the gRPC server and register the device plugin to Kubelet
|
|
func (m *FPGADevicePluginServer) Serve(resourceName string) error {
|
|
log.Debugf("In Serve(%s)", m.socket)
|
|
err := m.Start()
|
|
if err != nil {
|
|
log.Errorf("Could not start device plugin: %v", err)
|
|
return err
|
|
}
|
|
log.Infof("Starting to serve on %s", m.socket)
|
|
|
|
err = m.Register(pluginapi.KubeletSocket, resourceName)
|
|
if err != nil {
|
|
log.Errorf("Could not register device plugin: %v", err)
|
|
m.Stop()
|
|
return err
|
|
}
|
|
log.Infof("Registered device plugin with Kubelet %s", resourceName)
|
|
|
|
return nil
|
|
}
|
|
|
|
func (m *FPGADevicePluginServer) GetPreferredAllocation(ctx context.Context, req *pluginapi.PreferredAllocationRequest) (*pluginapi.PreferredAllocationResponse, error) {
|
|
response := new(pluginapi.PreferredAllocationResponse)
|
|
for _, creq := range req.ContainerRequests {
|
|
log.Debugf("Request IDs: %v", creq.AvailableDeviceIDs)
|
|
|
|
cres := new(pluginapi.ContainerPreferredAllocationResponse)
|
|
|
|
// Check same serial number devices, devices with same serail number "F1-node" will be marked as independent devices
|
|
deviceIDs_arry := creq.AvailableDeviceIDs
|
|
|
|
for _, id := range deviceIDs_arry {
|
|
log.Printf("Receiving request %s", id)
|
|
dev, ok := m.devices[id]
|
|
if !ok {
|
|
return nil, fmt.Errorf("Invalid allocation request with non-existing device %s", id)
|
|
}
|
|
if !m.deviceExists(id) {
|
|
return nil, fmt.Errorf("invalid allocation request: unknown device: %s", id)
|
|
}
|
|
fname := path.Join(SysfsDevices, dev.deviceID, DeviceFile)
|
|
cres.DeviceIDs = append(cres.DeviceIDs, fname)
|
|
response.ContainerResponses = append(response.ContainerResponses, cres)
|
|
}
|
|
}
|
|
return response, nil
|
|
}
|