diff options
Diffstat (limited to 'm4p')
| -rw-r--r-- | m4p/client.go | 203 | ||||
| -rw-r--r-- | m4p/discover.go | 74 | ||||
| -rw-r--r-- | m4p/m4p.go | 46 | ||||
| -rw-r--r-- | m4p/message.go | 91 | 
4 files changed, 414 insertions, 0 deletions
| diff --git a/m4p/client.go b/m4p/client.go new file mode 100644 index 0000000..526e6c2 --- /dev/null +++ b/m4p/client.go @@ -0,0 +1,203 @@ +package m4p + +import ( +	"context" +	"encoding/hex" +	"encoding/json" +	"fmt" +	"log" +	"net" +	"time" +) + +// Client represents an active connection to the magic4pc server. +type Client struct { +	ctx             context.Context +	cancel          context.CancelFunc +	conn            net.Conn +	opts            dialOptions +	serverKeepalive chan struct{} +	recvBuf         chan Message +} + +type dialOptions struct { +	updateFrequency int +	filters         []string +} + +// DialOption sets options for dial. +type DialOption func(*dialOptions) + +// WithUpdateFrequency sets the RemoteUpdate frequency. +func WithUpdateFrequency(f int) func(*dialOptions) { +	return func(o *dialOptions) { +		o.updateFrequency = f +	} +} + +// WithFilters specifies the filters used for RemoteUpdate. +func WithFilters(filters ...string) func(*dialOptions) { +	return func(o *dialOptions) { +		o.filters = filters +	} +} + +// Dial connects to a magic4pc server running in webOS. +func Dial(ctx context.Context, addr string, opts ...DialOption) (*Client, error) { +	o := dialOptions{ +		updateFrequency: 250, +		filters:         DefaultFilters, +	} +	for _, opt := range opts { +		opt(&o) +	} + +	d := &net.Dialer{ +		Timeout: 5 * time.Second, +	} +	conn, err := d.DialContext(ctx, "udp4", addr) +	if err != nil { +		return nil, err +	} + +	ctx, cancel := context.WithCancel(context.Background()) +	c := &Client{ +		ctx:             ctx, +		cancel:          cancel, +		conn:            conn, +		opts:            o, +		serverKeepalive: make(chan struct{}, 1), +		recvBuf:         make(chan Message, 10), // Buffer up to 10 messages after which we block. +	} + +	// Register our client with the server. +	m := NewMessage(SubSensorMessage) +	m.Register = &Register{ +		UpdateFrequency: c.opts.updateFrequency, +		Filter:          c.opts.filters, +	} + +	err = c.Send(m) +	if err != nil { +		c.Close() +		return nil, fmt.Errorf("register failed: %w", err) +	} + +	// Tell the server that we're alive and well. +	go c.keepalive() +	go c.recv() + +	return c, nil +} + +func (c *Client) recv() { +	var buf [1024]byte +recvLoop: +	for { +		select { +		case <-c.ctx.Done(): +			return +		default: +		} + +		n, err := c.conn.Read(buf[:]) +		if err != nil { +			log.Printf("m4p: Client: recv: read udp packet failed: %v", err) +			continue +		} + +		m, err := decode(buf[:n]) +		if err != nil { +			log.Printf("m4p: Client: recv: decode failed: %v", err) +			continue +		} + +		switch m.Type { +		case KeepAliveMessage: +			// log.Printf("m4p: Client: recv: got %s", m.Type) + +			// Trigger server keepalive, non-blocking (chan is buffered). +			select { +			case c.serverKeepalive <- struct{}{}: +			default: +			} + +			goto recvLoop + +		case InputMessage: +			log.Printf("m4p: Client: recv: got %s: %v", m.Type, m.Input) + +		case RemoteUpdateMessage: +			log.Printf("m4p: Client: recv: got %s: %s", m.Type, hex.EncodeToString(m.RemoteUpdate.Payload)) + +		default: +			log.Printf("m4p: Client: recv: unknown message: %s", m.Type) +		} + +		select { +		case c.recvBuf <- m: +		default: +			log.Printf("m4p: Client: recv: buffer full, discarding message: %s", m.Type) +		} +	} +} + +func (c *Client) keepalive() { +	defer c.Close() + +	serverDeadline := time.After(keepaliveTimeout) +	clientKeepalive := time.After(clientKeepaliveInterval) + +	for { +		select { +		case <-c.ctx.Done(): +			return + +		case <-c.serverKeepalive: +			serverDeadline = time.After(keepaliveTimeout) + +		case <-serverDeadline: +			log.Printf("m4p: Client: keepalive: server keepalive deadline reached, disconnecting...") +			return + +		case <-clientKeepalive: +			_, err := c.conn.Write([]byte("{}")) +			if err != nil { +				log.Printf("m4p: Client: keepalive: send client keepalive failed, disconnecting...") +				return +			} +			clientKeepalive = time.After(clientKeepaliveInterval) +		} +	} +} + +// Send a message to the magic4pc server. +func (c *Client) Send(m Message) error { +	b, err := json.Marshal(m) +	if err != nil { +		return fmt.Errorf("json encode message failed: %w", err) +	} +	if _, err = c.conn.Write(b); err != nil { +		return fmt.Errorf("write message failed: %w", err) +	} +	return nil +} + +// Recv messages from the magic4pc server. Keepalives are handled +// transparently by the client and are not observable. +func (c *Client) Recv(ctx context.Context) (Message, error) { +	select { +	case <-ctx.Done(): +		return Message{}, ctx.Err() +	case <-c.ctx.Done(): +		return Message{}, c.ctx.Err() +	case m := <-c.recvBuf: +		return m, nil +	} +} + +// Close the client and connection. +func (c *Client) Close() error { +	c.cancel() +	return c.conn.Close() +} diff --git a/m4p/discover.go b/m4p/discover.go new file mode 100644 index 0000000..a63dde2 --- /dev/null +++ b/m4p/discover.go @@ -0,0 +1,74 @@ +package m4p + +import ( +	"log" +	"net" +) + +// Discoverer magic4pc servers. +type Discoverer struct { +	ln     *net.UDPConn +	device chan DeviceInfo +} + +// NewDiscover returns a new Discoverer that listens on the broadcast port. +func NewDiscoverer(broadcastPort int) (*Discoverer, error) { +	addr := net.UDPAddr{ +		Port: broadcastPort, +		IP:   net.ParseIP("0.0.0.0"), +	} + +	ln, err := net.ListenUDP("udp", &addr) +	if err != nil { +		panic(err) +	} + +	d := &Discoverer{ +		ln:     ln, +		device: make(chan DeviceInfo), // Unbuffered, discard when nobody is listening. +	} +	go d.discover() + +	return d, nil +} + +func (d *Discoverer) discover() { +	var buf [1024]byte +	for { +		n, addr, err := d.ln.ReadFromUDP(buf[:]) +		if err != nil { +			log.Printf("m4p: Discoverer: discover: read udp packet failed: %v", err) +			continue +		} + +		m, err := decode(buf[:n]) +		if err != nil { +			log.Printf("m4p: Discoverer: discover: decode failed: %v", err) +		} + +		switch m.Type { +		case Magic4PCAdMessage: +			dev := m.DeviceInfo +			dev.IPAddr = addr.IP.String() +			log.Printf("m4p: Discoverer: discover: found device: %#v", dev) + +			select { +			case d.device <- *dev: +			default: +			} + +		default: +			log.Printf("m4p: Discoverer: discover: unknown message: %s", m.Type) +		} +	} +} + +// NextDevice returns a newly discovered magic4pc server. +func (d *Discoverer) NextDevice() <-chan DeviceInfo { +	return d.device +} + +// Close the Discoverer and stop listening for broadcasts. +func (d *Discoverer) Close() error { +	return d.ln.Close() +} diff --git a/m4p/m4p.go b/m4p/m4p.go new file mode 100644 index 0000000..07f78a5 --- /dev/null +++ b/m4p/m4p.go @@ -0,0 +1,46 @@ +package m4p + +import "time" + +// Protocol constants. +const ( +	protocolVersion         = 1 +	keepaliveTimeout        = 3 * time.Second +	clientKeepaliveInterval = 2 * time.Second +) + +// Magic remote keycodes. +const ( +	KeyWheelPressed = 13 +	KeyChannelUp    = 33 +	KeyChannelDown  = 34 +	KeyLeft         = 37 +	KeyUp           = 38 +	KeyRight        = 39 +	KeyDown         = 40 +	Key0            = 48 +	Key1            = 49 +	Key2            = 50 +	Key3            = 51 +	Key4            = 52 +	Key5            = 53 +	Key6            = 54 +	Key7            = 55 +	Key8            = 56 +	Key9            = 57 +	KeyRed          = 403 +	KeyGreen        = 404 +	KeyYellow       = 405 +	KeyBlue         = 406 +	KeyBack         = 461 +) + +// DefaultFilters used for remote updates. +var DefaultFilters = []string{ +	"returnValue", +	"deviceId", +	"coordinate", +	"gyroscope", +	"acceleration", +	"quaternion", +} diff --git a/m4p/message.go b/m4p/message.go new file mode 100644 index 0000000..34a9383 --- /dev/null +++ b/m4p/message.go @@ -0,0 +1,91 @@ +package m4p + +import ( +	"bytes" +	"encoding/json" +) + +type MessageType string + +// MessageType enums. +const ( +	Magic4PCAdMessage   MessageType = "magic4pc_ad" +	SubSensorMessage    MessageType = "sub_sensor" +	RemoteUpdateMessage MessageType = "remote_update" +	InputMessage        MessageType = "input" +	KeepAliveMessage    MessageType = "keepalive" +) + +// Message format sent over the wire. +type Message struct { +	Type    MessageType `json:"t"` +	Version int         `json:"version"` +	*DeviceInfo +	*Register +	*RemoteUpdate +	*Input +} + +// NewMessage initializes a message with the type and protocol version. +func NewMessage(typ MessageType) Message { +	return Message{ +		Type:    typ, +		Version: protocolVersion, +	} +} + +// DeviceInfo represents a magic4pc server. +type DeviceInfo struct { +	Model  string `json:"model"` +	IPAddr string `json:"-"` +	Port   int    `json:"port"` +	MAC    string `json:"mac"` +} + +// Register payload for registering a new client on the server. +type Register struct { +	UpdateFrequency int      `json:"updateFreq"` +	Filter          []string `json:"filter"` +} + +// Input event (key pressed). +type Input struct { +	Parameters struct { +		KeyCode int  `json:"keyCode"` +		IsDown  bool `json:"isDown"` +	} `json:"parameters"` +} + +// RemoteUpdate event, sensor data from the magic remote. +type RemoteUpdate struct { +	// filters []string +	Payload []byte `json:"payload"` +} + +// type ( +// 	ReturnValue  uint8 +// 	DeviceID     uint8 +// 	Coordinates  struct{ X, Y int32 } +// 	Gyroscope    struct{ X, Y, Z float32 } +// 	Acceleration struct{ X, Y, Z float32 } +// 	Quaternion   struct{ Q0, Q1, Q2, Q3 float32 } +// ) + +// func (ru RemoteUpdate) Coordinates() Coordinates { +// 	return Coordinates{} +// } + +// func (ru RemoteUpdate) Acceleration() Acceleration { +// 	return Acceleration{} +// } + +func decode(b []byte) (Message, error) { +	var m Message +	dec := json.NewDecoder(bytes.NewReader(b)) +	dec.DisallowUnknownFields() +	if err := dec.Decode(&m); err != nil { +		return Message{}, err +	} + +	return m, nil +} | 
