pubsub

package module
v0.0.0-...-ec8f16c Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 14, 2024 License: MIT Imports: 8 Imported by: 1

README

pubsub

A generic pubsub implementation for Go.

package main

import (
	"bytes"
	"context"
	"fmt"
	"log/slog"

	"github.com/picosh/pubsub"
)

func main() {
	ctx := context.TODO()
	logger := slog.Default()
	broker := pubsub.NewMulticast(logger)

	chann := []*pubsub.Channel{
		pubsub.NewChannel("my-topic"),
	}

	go func() {
		writer := bytes.NewBufferString("my data")
		_ = broker.Pub(ctx, "pubID", writer, chann, false)
	}()

	reader := bytes.NewBufferString("")
	_ = broker.Sub(ctx, "subID", reader, chann, false)

	// result
	fmt.Println("data from pub:", reader)
}

pubsub over ssh

The simplest pubsub system for everyday automation needs.

Using wish we can integrate our pubsub system into an SSH app.

asciicast

# term 1
mkdir ./ssh_data
cat ~/.ssh/id_ed25519 ./ssh_data/authorized_keys
go run ./cmd/example

# term 2
ssh -p 2222 localhost sub xyz

# term 3
echo "hello world" | ssh -p 2222 localhost pub xyz

Documentation

Index

Constants

View Source
const (
	ChannelActionData = iota
	ChannelActionClose
)

Variables

This section is empty.

Functions

This section is empty.

Types

type BaseBroker

type BaseBroker struct {
	Channels *syncmap.Map[string, *Channel]
	Logger   *slog.Logger
}

func (*BaseBroker) Cleanup

func (b *BaseBroker) Cleanup()

func (*BaseBroker) Connect

func (b *BaseBroker) Connect(client *Client, channels []*Channel) (error, error)

func (*BaseBroker) GetChannels

func (b *BaseBroker) GetChannels() iter.Seq2[string, *Channel]

func (*BaseBroker) GetClients

func (b *BaseBroker) GetClients() iter.Seq2[string, *Client]

type Broker

type Broker interface {
	GetChannels() iter.Seq2[string, *Channel]
	GetClients() iter.Seq2[string, *Client]
	Connect(*Client, []*Channel) (error, error)
}

Broker receives published messages and dispatches the message to the subscribing clients. An message contains a message topic that clients subscribe to and brokers use these subscription lists for determining the clients to receive the message.

type Channel

type Channel struct {
	Topic   string
	Done    chan struct{}
	Data    chan ChannelMessage
	Clients *syncmap.Map[string, *Client]
	// contains filtered or unexported fields
}

Channel is a container for a topic. It holds the list of clients and a data channel to receive a message.

func NewChannel

func NewChannel(topic string) *Channel

func (*Channel) Cleanup

func (c *Channel) Cleanup()

func (*Channel) GetClients

func (c *Channel) GetClients() iter.Seq2[string, *Client]

func (*Channel) Handle

func (c *Channel) Handle()

type ChannelAction

type ChannelAction int

func (ChannelAction) String

func (d ChannelAction) String() string

type ChannelDirection

type ChannelDirection int
const (
	ChannelDirectionInput ChannelDirection = iota
	ChannelDirectionOutput
	ChannelDirectionInputOutput
)

func (ChannelDirection) String

func (d ChannelDirection) String() string

type ChannelMessage

type ChannelMessage struct {
	Data      []byte
	ClientID  string
	Direction ChannelDirection
	Action    ChannelAction
}

type Client

type Client struct {
	ID         string
	ReadWriter io.ReadWriter
	Channels   *syncmap.Map[string, *Channel]
	Direction  ChannelDirection
	Done       chan struct{}
	Data       chan ChannelMessage
	Replay     bool
	BlockWrite bool
	KeepAlive  bool
	// contains filtered or unexported fields
}

Client is the container for holding state between multiple devices. A client has a direction (input, output, inputout) as well as a way to send data to all the associated channels.

func NewClient

func NewClient(ID string, rw io.ReadWriter, direction ChannelDirection, blockWrite, replay, keepAlive bool) *Client

func (*Client) Cleanup

func (c *Client) Cleanup()

func (*Client) GetChannels

func (c *Client) GetChannels() iter.Seq2[string, *Channel]

type Multicast

type Multicast struct {
	Broker
	Logger *slog.Logger
}

Multicast is a flexible, bidirectional broker.

It provides the most pure version of our PubSub interface which lets end-developers build one-to-many connections between publishers and subscribers and vice versa.

It doesn't provide any topic filtering capabilities and is only concerned with sending data to and from an `io.ReadWriter` via our channels.

func NewMulticast

func NewMulticast(logger *slog.Logger) *Multicast

func (*Multicast) GetPipes

func (p *Multicast) GetPipes() iter.Seq2[string, *Client]

func (*Multicast) GetPubs

func (p *Multicast) GetPubs() iter.Seq2[string, *Client]

func (*Multicast) GetSubs

func (p *Multicast) GetSubs() iter.Seq2[string, *Client]

func (*Multicast) Pipe

func (p *Multicast) Pipe(ctx context.Context, ID string, rw io.ReadWriter, channels []*Channel, replay bool) (error, error)

func (*Multicast) Pub

func (p *Multicast) Pub(ctx context.Context, ID string, rw io.ReadWriter, channels []*Channel, blockWrite bool) error

func (*Multicast) Sub

func (p *Multicast) Sub(ctx context.Context, ID string, rw io.ReadWriter, channels []*Channel, keepAlive bool) error

type PubSub

type PubSub interface {
	Broker
	GetPubs() iter.Seq2[string, *Client]
	GetSubs() iter.Seq2[string, *Client]
	GetPipes() iter.Seq2[string, *Client]
	Pipe(ctx context.Context, ID string, rw io.ReadWriter, channels []*Channel, replay bool) (error, error)
	Sub(ctx context.Context, ID string, rw io.ReadWriter, channels []*Channel, keepAlive bool) error
	Pub(ctx context.Context, ID string, rw io.ReadWriter, channels []*Channel, blockWrite bool) error
}

PubSub is our take on a basic publisher and subscriber interface.

It has a few notable requirements: - Each operation must accept an array of channels - A way to send, receive, and stream data between clients

PubSub also inherits the properties of a Broker.

Directories

Path Synopsis
cmd
basic command
example module

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL