From 6436c86e0c53d9fbd83d883f76cbd8a267b6428e Mon Sep 17 00:00:00 2001 From: Mathias Fredriksson Date: Mon, 7 Feb 2022 20:50:53 +0200 Subject: Implement initial PoC for Kodi --- m4p/client.go | 203 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++ m4p/discover.go | 74 +++++++++++++++++++++ m4p/m4p.go | 46 +++++++++++++ m4p/message.go | 91 +++++++++++++++++++++++++ 4 files changed, 414 insertions(+) create mode 100644 m4p/client.go create mode 100644 m4p/discover.go create mode 100644 m4p/m4p.go create mode 100644 m4p/message.go (limited to 'm4p') diff --git a/m4p/client.go b/m4p/client.go new file mode 100644 index 0000000..526e6c2 --- /dev/null +++ b/m4p/client.go @@ -0,0 +1,203 @@ +package m4p + +import ( + "context" + "encoding/hex" + "encoding/json" + "fmt" + "log" + "net" + "time" +) + +// Client represents an active connection to the magic4pc server. +type Client struct { + ctx context.Context + cancel context.CancelFunc + conn net.Conn + opts dialOptions + serverKeepalive chan struct{} + recvBuf chan Message +} + +type dialOptions struct { + updateFrequency int + filters []string +} + +// DialOption sets options for dial. +type DialOption func(*dialOptions) + +// WithUpdateFrequency sets the RemoteUpdate frequency. +func WithUpdateFrequency(f int) func(*dialOptions) { + return func(o *dialOptions) { + o.updateFrequency = f + } +} + +// WithFilters specifies the filters used for RemoteUpdate. +func WithFilters(filters ...string) func(*dialOptions) { + return func(o *dialOptions) { + o.filters = filters + } +} + +// Dial connects to a magic4pc server running in webOS. +func Dial(ctx context.Context, addr string, opts ...DialOption) (*Client, error) { + o := dialOptions{ + updateFrequency: 250, + filters: DefaultFilters, + } + for _, opt := range opts { + opt(&o) + } + + d := &net.Dialer{ + Timeout: 5 * time.Second, + } + conn, err := d.DialContext(ctx, "udp4", addr) + if err != nil { + return nil, err + } + + ctx, cancel := context.WithCancel(context.Background()) + c := &Client{ + ctx: ctx, + cancel: cancel, + conn: conn, + opts: o, + serverKeepalive: make(chan struct{}, 1), + recvBuf: make(chan Message, 10), // Buffer up to 10 messages after which we block. + } + + // Register our client with the server. + m := NewMessage(SubSensorMessage) + m.Register = &Register{ + UpdateFrequency: c.opts.updateFrequency, + Filter: c.opts.filters, + } + + err = c.Send(m) + if err != nil { + c.Close() + return nil, fmt.Errorf("register failed: %w", err) + } + + // Tell the server that we're alive and well. + go c.keepalive() + go c.recv() + + return c, nil +} + +func (c *Client) recv() { + var buf [1024]byte +recvLoop: + for { + select { + case <-c.ctx.Done(): + return + default: + } + + n, err := c.conn.Read(buf[:]) + if err != nil { + log.Printf("m4p: Client: recv: read udp packet failed: %v", err) + continue + } + + m, err := decode(buf[:n]) + if err != nil { + log.Printf("m4p: Client: recv: decode failed: %v", err) + continue + } + + switch m.Type { + case KeepAliveMessage: + // log.Printf("m4p: Client: recv: got %s", m.Type) + + // Trigger server keepalive, non-blocking (chan is buffered). + select { + case c.serverKeepalive <- struct{}{}: + default: + } + + goto recvLoop + + case InputMessage: + log.Printf("m4p: Client: recv: got %s: %v", m.Type, m.Input) + + case RemoteUpdateMessage: + log.Printf("m4p: Client: recv: got %s: %s", m.Type, hex.EncodeToString(m.RemoteUpdate.Payload)) + + default: + log.Printf("m4p: Client: recv: unknown message: %s", m.Type) + } + + select { + case c.recvBuf <- m: + default: + log.Printf("m4p: Client: recv: buffer full, discarding message: %s", m.Type) + } + } +} + +func (c *Client) keepalive() { + defer c.Close() + + serverDeadline := time.After(keepaliveTimeout) + clientKeepalive := time.After(clientKeepaliveInterval) + + for { + select { + case <-c.ctx.Done(): + return + + case <-c.serverKeepalive: + serverDeadline = time.After(keepaliveTimeout) + + case <-serverDeadline: + log.Printf("m4p: Client: keepalive: server keepalive deadline reached, disconnecting...") + return + + case <-clientKeepalive: + _, err := c.conn.Write([]byte("{}")) + if err != nil { + log.Printf("m4p: Client: keepalive: send client keepalive failed, disconnecting...") + return + } + clientKeepalive = time.After(clientKeepaliveInterval) + } + } +} + +// Send a message to the magic4pc server. +func (c *Client) Send(m Message) error { + b, err := json.Marshal(m) + if err != nil { + return fmt.Errorf("json encode message failed: %w", err) + } + if _, err = c.conn.Write(b); err != nil { + return fmt.Errorf("write message failed: %w", err) + } + return nil +} + +// Recv messages from the magic4pc server. Keepalives are handled +// transparently by the client and are not observable. +func (c *Client) Recv(ctx context.Context) (Message, error) { + select { + case <-ctx.Done(): + return Message{}, ctx.Err() + case <-c.ctx.Done(): + return Message{}, c.ctx.Err() + case m := <-c.recvBuf: + return m, nil + } +} + +// Close the client and connection. +func (c *Client) Close() error { + c.cancel() + return c.conn.Close() +} diff --git a/m4p/discover.go b/m4p/discover.go new file mode 100644 index 0000000..a63dde2 --- /dev/null +++ b/m4p/discover.go @@ -0,0 +1,74 @@ +package m4p + +import ( + "log" + "net" +) + +// Discoverer magic4pc servers. +type Discoverer struct { + ln *net.UDPConn + device chan DeviceInfo +} + +// NewDiscover returns a new Discoverer that listens on the broadcast port. +func NewDiscoverer(broadcastPort int) (*Discoverer, error) { + addr := net.UDPAddr{ + Port: broadcastPort, + IP: net.ParseIP("0.0.0.0"), + } + + ln, err := net.ListenUDP("udp", &addr) + if err != nil { + panic(err) + } + + d := &Discoverer{ + ln: ln, + device: make(chan DeviceInfo), // Unbuffered, discard when nobody is listening. + } + go d.discover() + + return d, nil +} + +func (d *Discoverer) discover() { + var buf [1024]byte + for { + n, addr, err := d.ln.ReadFromUDP(buf[:]) + if err != nil { + log.Printf("m4p: Discoverer: discover: read udp packet failed: %v", err) + continue + } + + m, err := decode(buf[:n]) + if err != nil { + log.Printf("m4p: Discoverer: discover: decode failed: %v", err) + } + + switch m.Type { + case Magic4PCAdMessage: + dev := m.DeviceInfo + dev.IPAddr = addr.IP.String() + log.Printf("m4p: Discoverer: discover: found device: %#v", dev) + + select { + case d.device <- *dev: + default: + } + + default: + log.Printf("m4p: Discoverer: discover: unknown message: %s", m.Type) + } + } +} + +// NextDevice returns a newly discovered magic4pc server. +func (d *Discoverer) NextDevice() <-chan DeviceInfo { + return d.device +} + +// Close the Discoverer and stop listening for broadcasts. +func (d *Discoverer) Close() error { + return d.ln.Close() +} diff --git a/m4p/m4p.go b/m4p/m4p.go new file mode 100644 index 0000000..07f78a5 --- /dev/null +++ b/m4p/m4p.go @@ -0,0 +1,46 @@ +package m4p + +import "time" + +// Protocol constants. +const ( + protocolVersion = 1 + keepaliveTimeout = 3 * time.Second + clientKeepaliveInterval = 2 * time.Second +) + +// Magic remote keycodes. +const ( + KeyWheelPressed = 13 + KeyChannelUp = 33 + KeyChannelDown = 34 + KeyLeft = 37 + KeyUp = 38 + KeyRight = 39 + KeyDown = 40 + Key0 = 48 + Key1 = 49 + Key2 = 50 + Key3 = 51 + Key4 = 52 + Key5 = 53 + Key6 = 54 + Key7 = 55 + Key8 = 56 + Key9 = 57 + KeyRed = 403 + KeyGreen = 404 + KeyYellow = 405 + KeyBlue = 406 + KeyBack = 461 +) + +// DefaultFilters used for remote updates. +var DefaultFilters = []string{ + "returnValue", + "deviceId", + "coordinate", + "gyroscope", + "acceleration", + "quaternion", +} diff --git a/m4p/message.go b/m4p/message.go new file mode 100644 index 0000000..34a9383 --- /dev/null +++ b/m4p/message.go @@ -0,0 +1,91 @@ +package m4p + +import ( + "bytes" + "encoding/json" +) + +type MessageType string + +// MessageType enums. +const ( + Magic4PCAdMessage MessageType = "magic4pc_ad" + SubSensorMessage MessageType = "sub_sensor" + RemoteUpdateMessage MessageType = "remote_update" + InputMessage MessageType = "input" + KeepAliveMessage MessageType = "keepalive" +) + +// Message format sent over the wire. +type Message struct { + Type MessageType `json:"t"` + Version int `json:"version"` + *DeviceInfo + *Register + *RemoteUpdate + *Input +} + +// NewMessage initializes a message with the type and protocol version. +func NewMessage(typ MessageType) Message { + return Message{ + Type: typ, + Version: protocolVersion, + } +} + +// DeviceInfo represents a magic4pc server. +type DeviceInfo struct { + Model string `json:"model"` + IPAddr string `json:"-"` + Port int `json:"port"` + MAC string `json:"mac"` +} + +// Register payload for registering a new client on the server. +type Register struct { + UpdateFrequency int `json:"updateFreq"` + Filter []string `json:"filter"` +} + +// Input event (key pressed). +type Input struct { + Parameters struct { + KeyCode int `json:"keyCode"` + IsDown bool `json:"isDown"` + } `json:"parameters"` +} + +// RemoteUpdate event, sensor data from the magic remote. +type RemoteUpdate struct { + // filters []string + Payload []byte `json:"payload"` +} + +// type ( +// ReturnValue uint8 +// DeviceID uint8 +// Coordinates struct{ X, Y int32 } +// Gyroscope struct{ X, Y, Z float32 } +// Acceleration struct{ X, Y, Z float32 } +// Quaternion struct{ Q0, Q1, Q2, Q3 float32 } +// ) + +// func (ru RemoteUpdate) Coordinates() Coordinates { +// return Coordinates{} +// } + +// func (ru RemoteUpdate) Acceleration() Acceleration { +// return Acceleration{} +// } + +func decode(b []byte) (Message, error) { + var m Message + dec := json.NewDecoder(bytes.NewReader(b)) + dec.DisallowUnknownFields() + if err := dec.Decode(&m); err != nil { + return Message{}, err + } + + return m, nil +} -- cgit v1.2.3