rx

package module
v0.2.5 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 4, 2025 License: MIT Imports: 11 Imported by: 0

README

rx

import "github.com/reactivego/rx"

Go Reference

Package rx provides Reactive Extensions, a powerful API for asynchronous programming in Go, built around observables and operators to process streams of data seamlessly.

Prerequisites

You’ll need Go 1.23 or later, as the implementation depends on language support for generics and iterators.

Observables

In rx, an Observables represents a stream of data that can emit items over time, while an Observer subscribes to it to receive and react to those emissions. This reactive approach enables asynchronous and concurrent operations without blocking execution. Instead of waiting for values to become available, an observer passively listens and responds whenever the observable emits data, errors, or a completion signal.

This page introduces the reactive pattern, explaining what Observables and Observers are and how subscriptions work. Other sections explore the powerful set of Operators that allow you to transform, combine, and control data streams efficiently.

An Observable:

  • is a stream of events.
  • assumes zero to many values over time.
  • pushes values
  • can take any amount of time to complete (or may never)
  • is cancellable
  • is lazy (it doesn't do anything until you subscribe).

Example

package main

import "github.com/reactivego/x"

func main() {
    x.From[any](1,"hi",2.3).Println().Wait()
}

Note the program creates a mixed type any observable from an int, string and a float64.

Output

1
hi
2.3

Example

package main

import "github.com/reactivego/rx"

func main() {
    rx.From(1,2,3).Println().Wait()
}

Note the program uses inferred type int for the observable.

Output

1
2
3

Observables in rx offer several advantages over standard Go channels:

Hot vs Cold Observables
  • Hot Observables emit values regardless of subscription status. Like a live broadcast, any values emitted when no subscribers are listening are permanently missed. Examples include system events, mouse movements, or real-time data feeds.

  • Cold Observables begin emission only when subscribed to, ensuring subscribers receive the complete data sequence from the beginning. Examples include file contents, database queries, or HTTP requests that are executed on-demand.

Rich Lifecycle Management

Observables offer comprehensive lifecycle handling. They can complete normally, terminate with errors, or continue indefinitely. Subscriptions provide fine-grained control, allowing subscribers to cancel at any point, preventing resource leaks and unwanted processing.

Time-Varying Data Model

Unlike traditional variables that represent static values, Observables elegantly model how values evolve over time. They represent the entire progression of a value's state changes, not just its current state, making them ideal for reactive programming paradigms.

Native Concurrency Support

Concurrency is built into the Observable paradigm. Each Observable conceptually operates as an independent process that asynchronously pushes values to subscribers. This approach naturally aligns with concurrent programming models while abstracting away much of the complexity typically associated with managing concurrent operations.

Operators

Operators form a language for expressing programs with Observables. They transform, filter, and combine one or more Observables into new Observables, allowing for powerful data stream processing. Each operator performs a specific function in the reactive pipeline, enabling you to compose complex asynchronous workflows through method chaining.

Index

All converts an Observable stream into a Go 1.22+ iterator sequence that provides each emitted value paired with its sequential zero-based index

All2 converts an Observable of Tuple2 pairs into a Go 1.22+ iterator sequence that yields each tuple's components (First, Second) as separate values.

Append creates a pipe that appends emitted values to a provided slice while forwarding them to the next observer, with a method variant available for chaining.

AsObservable provides type conversion between observables, allowing you to safely cast an Observable of one type to another, and to convert a typed Observable to an Observable of 'any' type (and vice versa).

AsObserver converts an Observer of type any to an Observer of a specific type T.

Assign stores each emitted value from an Observable into a provided pointer variable while passing all emissions through to the next observer, enabling value capture during stream processing.

AutoConnect makes a (Connectable) Multicaster behave like an ordinary Observable that automatically connects the mullticaster to its source when the specified number of observers have subscribed to it.

AutoUnsubscribe

BufferCount

Catch recovers from an error notification by continuing the sequence without emitting the error but switching to the catch ObservableInt to provide items.

CatchError catches errors on the Observable to be handled by returning a new Observable or throwing error.

CombineAll

CombineLatest combines multiple Observables into one by emitting an array containing the latest values from each source whenever any input Observable emits a value, with variants (CombineLatest2, CombineLatest3, CombineLatest4, CombineLatest5) that return strongly-typed tuples for 2-5 input Observables respectively.

Concat combines multiple Observables sequentially by emitting all values from the first Observable before proceeding to the next one, ensuring emissions never overlap.

ConcatAll transforms a higher-order Observable (an Observable that emits other Observables) into a first-order Observable by subscribing to each inner Observable only after the previous one completes.

ConcatMap projects each source value to an Observable, subscribes to it, and emits its values, waiting for each one to complete before processing the next source value.

ConcatWith extends an Observable by appending additional Observables, ensuring that emissions from each Observable only begin after the previous one completes.

Connectable is an Observable with delayed connection to its source, combining both Observable and Connector interfaces. It separates the subscription process into two parts: observers can register via Subscribe, but the Observable won't subscribe to its source until Connect is explicitly called. This enables multiple observers to subscribe before any emissions begin (multicast behavior), allowing a single source Observable to be efficiently shared among multiple consumers. Besides inheriting all methods from Observable and Connector, Connectable provides the convenience methods AutoConnect and RefCount to manage connection behavior.

Connect establishes a connection to the source Observable and returns a Subscription that can be used to cancel the connection when no longer needed.

Connector provides a mechanism for controlling when a Connectable Observable subscribes to its source, allowing you to connect the Observable independently from when observers subscribe to it. This separation enables multiple subscribers to prepare their subscriptions before the source begins emitting items. It has a single method Connect.

Constraints type constraints Signed, Unsigned, Integer and Float copied verbatim from golang.org/x/exp so we could drop the dependency on that package.

Count returns an Observable that emits a single value representing the total number of items emitted by the source Observable before it completes.

Create constructs a new Observable from a Creator function, providing a bridge between imperative code and the reactive Observable pattern. The Observable will continue producing values until the Creator signals completion, the Observer unsubscribes, or the Creator returns an error.

Creator is a function type that generates values for an Observable stream. It receives a zero-based index for the current iteration and returns a tuple containing the next value to emit, any error that occurred, and a boolean flag indicating whether the sequence is complete.

Defer

Delay

DistinctUntilChanged only emits when the current value is different from the last.

Do calls a function for each next value passing through the observable.

ElementAt emit only item n emitted by an Observable.

Empty creates an Observable that emits no items but terminates normally.

EndWith

Equal

Err

ExhaustAll

ExhaustMap

Filter emits only those items from an observable that pass a predicate test.

First emits only the first item from an Observable.

Fprint

Fprintf

Fprintln

From creates an observable from multiple values passed in.

Go subscribes to the observable and starts execution on a separate goroutine, ignoring all emissions from the observable sequence. This makes it useful when you only care about side effects and not the actual values. Returns a Subscription that can be used to cancel the subscription when no longer needed.

Ignore[T] creates an Observer[T] that simply discards any emissions from an Observable. It is useful when you need to create an Observer but don't care about its values.

Interval creates an ObservableInt that emits a sequence of integers spaced by a particular time terval.

Last emits only the last item emitted by an Observable.

Map transforms the items emitted by an Observable by applying a function to each item.

MapE

Marshal

MaxBufferSizeOption, WithMaxBufferSize

Merge combines multiple Observables into one by merging their emissions.

MergeAll flattens a higher order observable by merging the observables it emits.

MergeMap transforms the items emitted by an Observable by applying a function to each item an turning an Observable.

MergeWith combines multiple Observables into one by merging their emissions.

Multicast

Must

Never creates an Observable that emits no items and does't terminate.

Observable

Observer

Of emits a variable amount of values in a sequence and then emits a complete notification.

OnComplete

OnDone

OnError

OnNext

Passthrough just passes through all output from the Observable.

Pipe

Print

Printf

Println subscribes to the Observable and prints every item to os.Stdout.

Publish returns a multicasting Observable[T] for an underlying Observable[T] as a Connectable[T] type.

Pull

Pull2

Race

RaceWith

Recv

Reduce applies a reducer function to each item emitted by an Observable and the previous reducer sult.

ReduceE

RefCount makes a Connectable behave like an ordinary Observable.

Repeat creates an observable that emits a sequence of items repeatedly.

Retry if a source Observable sends an error notification, resubscribe to it in the hopes that it ll complete without error.

RetryTime

SampleTime emits the most recent item emitted by an Observable within periodic time intervals.

Scan applies a accumulator function to each item emitted by an Observable and the previous cumulator result.

ScanE

Scheduler

Send

Share

Skip suppresses the first n items emitted by an Observable.

Slice

StartWith returns an observable that, at the moment of subscription, will synchronously emit all values provided to this operator, then subscribe to the source and mirror all of its emissions to subscribers.

Subject is a combination of an observer and observable.

Subscribe operates upon the emissions and notifications from an Observable.

SubscribeOn specifies the scheduler an Observable should use when it is subscribed to.

Subscriber

Subscription

SwitchAll

SwitchMap

Take emits only the first n items emitted by an Observable.

TakeWhile mirrors items emitted by an Observable until a specified condition becomes false.

Tap

Throw creates an observable that emits no items and terminates with an error.

Ticker creates an ObservableTime that emits a sequence of timestamps after an initialDelay has passed.

Timer creates an Observable that emits a sequence of integers (starting at zero) after an initialDelay has passed.

Tuple

Values

Wait subscribes to the Observable and waits for completion or error.

WithLatestFrom will subscribe to all Observables and wait for all of them to emit before emitting the first slice.

WithLatestFromAll flattens a higher order observable.

Zip

ZipAll

Documentation

Overview

Package rx provides Reactive Extensions, a powerful API for asynchronous programming in Go, built around observables and operators to process streams of data seamlessly.

Example (All)
package main

import (
	"fmt"

	"github.com/reactivego/rx"
)

func main() {
	source := rx.From("ZERO", "ONE", "TWO")

	for k, v := range source.All() {
		fmt.Println(k, v)
	}

	fmt.Println("OK")
}
Output:

0 ZERO
1 ONE
2 TWO
OK
Example (AutoConnect)
package main

import (
	"fmt"
	"time"

	"github.com/reactivego/rx"
)

func main() {
	// Create a multicaster hot observable that will emit every 100 milliseconds
	hot := rx.Interval[int](100 * time.Millisecond).Take(10).Publish()
	hotsub := hot.Connect(rx.Goroutine)
	defer hotsub.Unsubscribe()
	fmt.Println("Hot observable created and emitting 0,1,2,3,4,5,6 ...")

	// Publish the hot observable again but only Connect to it when 2
	// subscribers have connected.
	source := hot.Take(5).Publish().AutoConnect(2)

	// First subscriber
	sub1 := source.Printf("Subscriber 1: %d\n").Go()
	fmt.Println("First subscriber connected, waiting a bit...")

	// Wait a bit, nothing will emit yet
	time.Sleep(525 * time.Millisecond)

	fmt.Println("Second subscriber connecting, emissions begin!")
	// Second subscriber triggers the connection
	sub2 := source.Printf("Subscriber 2: %d\n").Go()

	// Wait for emissions to complete
	hotsub.Wait()
	sub1.Wait()
	sub2.Wait()

}
Output:

Hot observable created and emitting 0,1,2,3,4,5,6 ...
First subscriber connected, waiting a bit...
Second subscriber connecting, emissions begin!
Subscriber 1: 5
Subscriber 2: 5
Subscriber 1: 6
Subscriber 2: 6
Subscriber 1: 7
Subscriber 2: 7
Subscriber 1: 8
Subscriber 2: 8
Subscriber 1: 9
Subscriber 2: 9
Example (BufferCount)
package main

import (
	"fmt"

	"github.com/reactivego/rx"
)

func main() {
	source := rx.From(0, 1, 2, 3)

	fmt.Println("BufferCount(From(0, 1, 2, 3), 2, 1)")
	rx.BufferCount(source, 2, 1).Println().Wait()

	fmt.Println("BufferCount(From(0, 1, 2, 3), 2, 2)")
	rx.BufferCount(source, 2, 2).Println().Wait()

	fmt.Println("BufferCount(From(0, 1, 2, 3), 2, 3)")
	rx.BufferCount(source, 2, 3).Println().Wait()

	fmt.Println("BufferCount(From(0, 1, 2, 3), 3, 2)")
	rx.BufferCount(source, 3, 2).Println().Wait()

	fmt.Println("BufferCount(From(0, 1, 2, 3), 6, 6)")
	rx.BufferCount(source, 6, 6).Println().Wait()

	fmt.Println("BufferCount(From(0, 1, 2, 3), 2, 0)")
	rx.BufferCount(source, 2, 0).Println().Wait()
}
Output:

BufferCount(From(0, 1, 2, 3), 2, 1)
[0 1]
[1 2]
[2 3]
[3]
BufferCount(From(0, 1, 2, 3), 2, 2)
[0 1]
[2 3]
BufferCount(From(0, 1, 2, 3), 2, 3)
[0 1]
[3]
BufferCount(From(0, 1, 2, 3), 3, 2)
[0 1 2]
[2 3]
BufferCount(From(0, 1, 2, 3), 6, 6)
[0 1 2 3]
BufferCount(From(0, 1, 2, 3), 2, 0)
[0 1]
Example (ConcatAll)
package main

import (
	"fmt"
	"time"

	"github.com/reactivego/rx"
)

func main() {
	source := rx.Empty[rx.Observable[string]]()
	rx.ConcatAll(source).Wait()

	source = rx.Of(rx.Empty[string]())
	rx.ConcatAll(source).Wait()

	req := func(request string, duration time.Duration) rx.Observable[string] {
		req := rx.From(request + " response")
		if duration == 0 {
			return req
		}
		return req.Delay(duration)
	}

	const ms = time.Millisecond

	req1 := req("first", 10*ms)
	req2 := req("second", 20*ms)
	req3 := req("third", 0*ms)
	req4 := req("fourth", 60*ms)

	source = rx.From(req1).ConcatWith(rx.From(req2, req3, req4).Delay(100 * ms))
	rx.ConcatAll(source).Println().Wait()

	fmt.Println("OK")
}
Output:

first response
second response
third response
fourth response
OK
Example (Count)
package main

import (
	"fmt"

	"github.com/reactivego/rx"
)

func main() {
	source := rx.From(1, 2, 3, 4, 5)

	count := source.Count()
	count.Println().Wait()

	emptySource := rx.Empty[int]()
	emptyCount := emptySource.Count()
	emptyCount.Println().Wait()

	fmt.Println("OK")
}
Output:

5
0
OK
Example (ElementAt)
package main

import (
	"github.com/reactivego/rx"
)

func main() {
	rx.From(0, 1, 2, 3, 4).ElementAt(2).Println().Wait()
}
Output:

2
Example (ExhaustAll)
package main

import (
	"fmt"
	"strconv"
	"time"

	"github.com/reactivego/rx"
)

func main() {
	const ms = time.Millisecond

	stream := func(name string, duration time.Duration, count int) rx.Observable[string] {
		return rx.Map(rx.Timer[int](0*ms, duration), func(next int) string {
			return name + "-" + strconv.Itoa(next)
		}).Take(count)
	}

	streams := []rx.Observable[string]{
		stream("a", 20*ms, 3),
		stream("b", 20*ms, 3),
		stream("c", 20*ms, 3),
		rx.Empty[string](),
	}

	streamofstreams := rx.Map(rx.Timer[int](20*ms, 30*ms, 250*ms, 100*ms).Take(4), func(next int) rx.Observable[string] {
		return streams[next]
	})

	err := rx.ExhaustAll(streamofstreams).Println().Wait()

	if err == nil {
		fmt.Println("success")
	}
}
Output:

a-0
a-1
a-2
c-0
c-1
c-2
success
Example (Marshal)
package main

import (
	"encoding/json"

	"github.com/reactivego/rx"
)

func main() {
	type R struct {
		A string `json:"a"`
		B string `json:"b"`
	}

	b2s := func(data []byte) string { return string(data) }

	rx.Map(rx.Of(R{"Hello", "World"}).Marshal(json.Marshal), b2s).Println().Wait()
}
Output:

{"a":"Hello","b":"World"}
Example (MergeMap)
package main

import (
	"fmt"

	"github.com/reactivego/rx"
)

func main() {
	source := rx.From("https://reactivego.io", "https://github.com/reactivego")

	merged := rx.MergeMap(source, func(next string) rx.Observable[string] {
		fakeFetchData := rx.Of(fmt.Sprintf("content of %q", next))
		return fakeFetchData
	})

	merged.Println().Go().Wait()

}
Output:

content of "https://reactivego.io"
content of "https://github.com/reactivego"
Example (MergeMapSubject)
package main

import (
	"fmt"
	"time"

	"github.com/reactivego/rx"
)

func main() {
	source := rx.From("https://google.com", "https://reactivego.io", "https://github.com/reactivego")

	merged := rx.MergeMap(source, func(next string) rx.Observable[string] {
		fakeFetchData := rx.Of(fmt.Sprintf("content of %q", next))
		return fakeFetchData
	})

	// subject remembers last 2 emits by the observer for an hour.
	observer, subject := rx.Subject[string](time.Hour, 2)
	merged.Tap(observer).Go()

	// Sees all emits by the subject
	subject.Println().Go().Wait()

	// Sees only last 2 emits
	subject.Println().Go().Wait()

	// Sees only last 2 emits
	subject.Println().Go().Wait()

}
Output:

content of "https://google.com"
content of "https://reactivego.io"
content of "https://github.com/reactivego"
content of "https://reactivego.io"
content of "https://github.com/reactivego"
content of "https://reactivego.io"
content of "https://github.com/reactivego"
Example (Multicast)
package main

import (
	"fmt"

	"github.com/reactivego/rx"
)

func main() {
	serial := rx.NewScheduler()

	in, out := rx.Multicast[int](1)

	// Ignore everything before any subscriptions, including the last!
	in.Next(-2)
	in.Next(-1)
	in.Next(0)
	in.Next(1)

	// Schedule the subsequent emits in a loop. This will be the first task to
	// run on the serial scheduler after the subscriptions have been added.
	serial.ScheduleLoop(2, func(index int, again func(next int)) {
		if index < 4 {
			in.Next(index)
			again(index + 1)
		} else {
			in.Done(rx.Err)
		}
	})

	// Add a couple of subscriptions
	sub1 := out.Println().Go(serial)
	sub2 := out.Println().Go(serial)

	// Let the scheduler run and wait for all of its scheduled tasks to finish.
	serial.Wait()
	fmt.Println(sub1.Wait())
	fmt.Println(sub2.Wait())
}
Output:

2
2
3
3
rx
rx
Example (MulticastDrop)
package main

import (
	"fmt"

	"github.com/reactivego/rx"
)

func main() {
	serial := rx.NewScheduler()

	const onBackpressureDrop = -1

	// multicast with backpressure handling set to dropping incoming
	// items that don't fit in the buffer once it has filled up.
	in, out := rx.Multicast[int](1 * onBackpressureDrop)

	// ignore everything before any subscriptions, including the last!
	in.Next(-2)
	in.Next(-1)
	in.Next(0)
	in.Next(1)

	// add a couple of subscriptions
	sub1 := out.Println().Go(serial)
	sub2 := out.Println().Go(serial)

	in.Next(2)      // accepted: buffer not full
	in.Next(3)      // dropped: buffer full
	in.Done(rx.Err) // dropped: buffer full

	serial.Wait()
	fmt.Println(sub1.Wait())
	fmt.Println(sub2.Wait())
}
Output:

2
2
<nil>
<nil>
Example (Race)
package main

import (
	"errors"
	"fmt"
	"time"

	"github.com/reactivego/rx"
)

func main() {
	const ms = time.Millisecond

	req := func(request string, duration time.Duration) rx.Observable[string] {
		return rx.From(request + " response").Delay(duration)
	}

	req1 := req("first", 50*ms)
	req2 := req("second", 10*ms)
	req3 := req("third", 60*ms)

	rx.Race(req1, req2, req3).Println().Wait()

	err := func(text string, duration time.Duration) rx.Observable[int] {
		return rx.Throw[int](errors.New(text + " error")).Delay(duration)
	}

	err1 := err("first", 10*ms)
	err2 := err("second", 20*ms)
	err3 := err("third", 30*ms)

	fmt.Println(rx.Race(err1, err2, err3).Wait(rx.Goroutine))
}
Output:

second response
first error
Example (Retry)
package main

import (
	"fmt"

	"github.com/reactivego/rx"
)

func main() {
	var first error = rx.Err
	a := rx.Create(func(index int) (next int, err error, done bool) {
		if index < 3 {
			return index, nil, false
		}
		err, first = first, nil
		return 0, err, true
	})
	err := a.Retry().Println().Wait()
	fmt.Println(first == nil)
	fmt.Println(err)
}
Output:

0
1
2
0
1
2
true
<nil>
Example (Share)
package main

import (
	"github.com/reactivego/rx"
)

func main() {
	serial := rx.NewScheduler()

	shared := rx.From(1, 2, 3).Share()

	shared.Println().Go(serial)
	shared.Println().Go(serial)
	shared.Println().Go(serial)

	serial.Wait()
}
Output:

1
1
1
2
2
2
3
3
3
Example (Skip)
package main

import (
	"github.com/reactivego/rx"
)

func main() {
	rx.From(1, 2, 3, 4, 5).Skip(2).Println().Wait()
}
Output:

3
4
5
Example (Subject)
package main

import (
	"fmt"

	"github.com/reactivego/rx"
)

func main() {
	serial := rx.NewScheduler()

	// subject collects emits when there are no subscriptions active.
	in, out := rx.Subject[int](0, 1)

	// ignore everything before any subscriptions, except the last because buffer size is 1
	in.Next(-2)
	in.Next(-1)
	in.Next(0)
	in.Next(1)

	// add a couple of subscriptions
	sub1 := out.Println().Go(serial)
	sub2 := out.Println().Go(serial)

	// schedule the subsequent emits on the serial scheduler otherwise these calls
	// will block because the buffer is full.
	// subject will detect usage of scheduler on observable side and use it on the
	// observer side to keep the data flow through the subject going.
	serial.Schedule(func() {
		in.Next(2)
		in.Next(3)
		in.Done(rx.Err)
	})

	serial.Wait()
	fmt.Println(sub1.Wait())
	fmt.Println(sub2.Wait())
}
Output:

1
1
2
2
3
3
rx
rx
Example (SwitchAll)
package main

import (
	"fmt"
	"time"

	"github.com/reactivego/rx"
)

func main() {
	const ms = time.Millisecond

	interval42x4 := rx.Interval[int](42 * ms).Take(4)
	interval16x4 := rx.Interval[int](16 * ms).Take(4)

	err := rx.SwitchAll(rx.Map(interval42x4, func(next int) rx.Observable[int] { return interval16x4 })).Println().Wait(rx.Goroutine)

	if err == nil {
		fmt.Println("success")
	}
}
Output:

0
1
0
1
0
1
0
1
2
3
success
Example (SwitchMap)
package main

import (
	"fmt"
	"time"

	"github.com/reactivego/rx"
)

func main() {
	const ms = time.Millisecond

	webreq := func(request string, duration time.Duration) rx.Observable[string] {
		return rx.From(request + " result").Delay(duration)
	}

	first := webreq("first", 50*ms)
	second := webreq("second", 10*ms)
	latest := webreq("latest", 50*ms)

	switchmap := rx.SwitchMap(rx.Interval[int](20*ms).Take(3), func(i int) rx.Observable[string] {
		switch i {
		case 0:
			return first
		case 1:
			return second
		case 2:
			return latest
		default:
			return rx.Empty[string]()
		}
	})

	err := switchmap.Println().Wait()
	if err == nil {
		fmt.Println("success")
	}
}
Output:

second result
latest result
success
Example (Values)
package main

import (
	"fmt"

	"github.com/reactivego/rx"
	"github.com/reactivego/scheduler"
)

func main() {
	source := rx.From(1, 3, 5)

	// Why choose the Goroutine concurrent scheduler?
	// An observable can actually be at the root of a tree
	// of separately running observables that have their
	// responses merged. The Goroutine scheduler allows
	// these observables to run concurrently.

	// run the observable on 1 or more goroutines
	for i := range source.Values(scheduler.Goroutine) {
		// This is called from a newly created goroutine
		fmt.Println(i)
	}

	// run the observable on the current goroutine
	for i := range source.Values(scheduler.New()) {
		fmt.Println(i)
	}

	fmt.Println("OK")
}
Output:

1
3
5
1
3
5
OK

Index

Examples

Constants

This section is empty.

Variables

View Source
var Err = errors.New("rx")

Err declares the base error that is joined with every error returned by this package. It serves as the foundation for error types in the reactive extensions library, allowing for error type checking and custom error creation with consistent taxonomy.

View Source
var ErrInvalidCount = errors.Join(Err, errors.New("invalid count"))
View Source
var ErrOutOfSubjectSubscriptions = errors.Join(Err, errors.New("out of subject subscriptions"))
View Source
var ErrRepeatCountInvalid = errors.Join(Err, errors.New("repeat count invalid"))
View Source
var ErrSubscriptionActive = errors.Join(Err, errors.New("subscription active"))

ErrSubscriptionActive is the error returned by Err() when the subscription is still active and has not yet completed or been canceled.

View Source
var ErrSubscriptionCanceled = errors.Join(Err, errors.New("subscription canceled"))

ErrSubscriptionCanceled is the error returned by Wait() and Err() when the subscription was canceled by calling Unsubscribe() on the Subscription. This indicates the subscription was terminated by the subscriber rather than by the observable completing normally or with an error.

View Source
var ErrTypecastFailed = errors.Join(Err, errors.New("typecast failed"))

ErrTypecastFailed is returned when a type conversion fails during observer operations, typically when using AsObserver() to convert between generic and typed observers.

View Source
var ErrZipBufferOverflow = errors.Join(Err, errors.New("zip buffer overflow"))
View Source
var Goroutine = scheduler.Goroutine
View Source
var NewScheduler = scheduler.New

Functions

func All2 added in v0.2.2

func All2[T, U any](observable Observable[Tuple2[T, U]], scheduler ...Scheduler) iter.Seq2[T, U]

All2 converts an Observable of Tuple2 pairs into an iterator sequence. It transforms an Observable[Tuple2[T, U]] into an iter.Seq2[T, U] that yields each tuple's components. The scheduler parameter is optional and determines the execution context. Note: This method ignores any errors from the observable stream.

func Equal added in v0.2.0

func Equal[T comparable]() func(T, T) bool

func Multicast added in v0.2.0

func Multicast[T any](size int) (Observer[T], Observable[T])

Multicast returns both an Observer and and Observable. The returned Observer is used to send items into the Multicast. The returned Observable is used to subscribe to the Multicast. The Multicast multicasts items send through the Observer to every Subscriber of the Observable.

size  size of the item buffer, number of items kept to replay to a new Subscriber.

Backpressure handling depends on the sign of the size argument. For positive size the multicast will block when one of the subscribers lets the buffer fill up. For negative size the multicast will drop items on the blocking subscriber, allowing the others to keep on receiving values. For hot observables dropping is preferred.

func Must added in v0.2.0

func Must[T any](t T, err error) T

func Subject

func Subject[T any](age time.Duration, capacity ...int) (Observer[T], Observable[T])

Subject returns both an Observer and and Observable. The returned Observer is used to send items into the Subject. The returned Observable is used to subscribe to the Subject. The Subject multicasts items send through the Observer to every Subscriber of the Observable.

age     max age to keep items in order to replay them to a new Subscriber (0 = no max age).
[size]  size of the item buffer, number of items kept to replay to a new Subscriber.
[cap]   capacity of the item buffer, number of items that can be observed before blocking.
[scap]  capacity of the subscription list, max number of simultaneous subscribers.

Types

type ConcurrentScheduler added in v0.2.0

type ConcurrentScheduler = scheduler.ConcurrentScheduler

type Connectable

type Connectable[T any] struct {
	Observable[T]
	Connector
}

Connectable[T] is an Observable[T] that provides delayed connection to its source. It combines both Observable and Connector interfaces:

  • Subscribe: Allows consumers to register for notifications from this Observable
  • Connect: Triggers the actual subscription to the underlying source Observable

The key feature of Connectable[T] is that it doesn't subscribe to its source until the Connect method is explicitly called, allowing multiple observers to subscribe before the source begins emitting items (multicast behavior).

func (Connectable[T]) AutoConnect added in v0.2.0

func (connectable Connectable[T]) AutoConnect(count int) Observable[T]

AutoConnect returns an Observable that automatically connects to the Connectable source when a specified number of subscribers subscribe to it.

When the specified number of subscribers (count) is reached, the Connectable source is connected, allowing it to start emitting items. The connection is shared among all subscribers. When all subscribers unsubscribe, the connection is terminated.

If count is less than 1, it returns an Observable that emits an ErrInvalidCount error.

func (Connectable[T]) RefCount added in v0.2.0

func (connectable Connectable[T]) RefCount() Observable[T]

RefCount converts a Connectable Observable into a standard Observable that automatically connects when the first subscriber subscribes and disconnects when the last subscriber unsubscribes.

When the first subscriber subscribes to the resulting Observable, it automatically calls Connect() on the source Connectable Observable. The connection is shared among all subscribers. When the last subscriber unsubscribes, the connection is automatically closed.

This is useful for efficiently sharing expensive resources (like network connections) among multiple subscribers.

type Connector added in v0.2.3

type Connector func(Scheduler, Subscriber)

Connector provides the Connect method for a Connectable[T].

func (Connector) Connect added in v0.2.3

func (connect Connector) Connect(schedulers ...Scheduler) Subscription

Connect instructs a Connectable[T] to subscribe to its source and begin emitting items to its subscribers. Connect accepts an optional scheduler argument.

type Creator added in v0.2.0

type Creator[T any] func(index int) (Next T, Err error, Done bool)

Creator[T] is a function type that generates values for an Observable stream.

The Creator function receives a zero-based index for the current iteration and returns a tuple containing:

  • Next: The next value to emit (of type T)
  • Err: Any error that occurred during value generation
  • Done: Boolean flag indicating whether the sequence is complete

When Done is true, the Observable will complete after emitting any provided error. When Err is non-nil, the Observable will emit the error and then complete.

type Float added in v0.2.2

type Float interface {
	~float32 | ~float64
}

Float is a constraint that permits any floating-point type. If future releases of Go add new predeclared floating-point types, this constraint will be modified to include them.

type Integer added in v0.2.2

type Integer interface {
	Signed | Unsigned
}

Integer is a constraint that permits any integer type. If future releases of Go add new predeclared integer types, this constraint will be modified to include them.

type MaxBufferSizeOption added in v0.2.1

type MaxBufferSizeOption = func(*int)

MaxBufferSizeOption is a function type used for configuring the maximum buffer size of an observable stream.

func WithMaxBufferSize added in v0.2.1

func WithMaxBufferSize(n int) MaxBufferSizeOption

WithMaxBufferSize creates a MaxBufferSizeOption that sets the maximum buffer size to n. This option is typically used when creating new observables to control memory usage.

type Observable

type Observable[T any] func(Observer[T], Scheduler, Subscriber)

func AsObservable added in v0.2.0

func AsObservable[T any](observable Observable[any]) Observable[T]

func BufferCount added in v0.2.0

func BufferCount[T any](observable Observable[T], bufferSize, startBufferEvery int) Observable[[]T]

func CombineAll added in v0.2.0

func CombineAll[T any](observable Observable[Observable[T]]) Observable[[]T]

func CombineLatest

func CombineLatest[T any](observables ...Observable[T]) Observable[[]T]

func CombineLatest2 added in v0.2.1

func CombineLatest2[T, U any](first Observable[T], second Observable[U]) Observable[Tuple2[T, U]]

func CombineLatest3 added in v0.2.1

func CombineLatest3[T, U, V any](first Observable[T], second Observable[U], third Observable[V]) Observable[Tuple3[T, U, V]]

func CombineLatest4 added in v0.2.1

func CombineLatest4[T, U, V, W any](first Observable[T], second Observable[U], third Observable[V], fourth Observable[W]) Observable[Tuple4[T, U, V, W]]

func CombineLatest5 added in v0.2.1

func CombineLatest5[T, U, V, W, X any](first Observable[T], second Observable[U], third Observable[V], fourth Observable[W], fifth Observable[X]) Observable[Tuple5[T, U, V, W, X]]

func Concat

func Concat[T any](observables ...Observable[T]) Observable[T]

func ConcatAll added in v0.2.0

func ConcatAll[T any](observable Observable[Observable[T]]) Observable[T]

func ConcatMap added in v0.2.0

func ConcatMap[T, U any](observable Observable[T], project func(T) Observable[U]) Observable[U]

func Create

func Create[T any](create Creator[T]) Observable[T]

Create constructs a new Observable from a Creator function.

The Creator function is called repeatedly with an incrementing index value, and returns a tuple of (next value, error, done flag). The Observable will continue producing values until either:

  1. The Creator signals completion by returning done=true
  2. The Observer unsubscribes
  3. The Creator returns an error (which will be emitted with done=true)

This function provides a bridge between imperative code and the reactive Observable pattern.

func Defer

func Defer[T any](factory func() Observable[T]) Observable[T]

func Empty

func Empty[T any]() Observable[T]

func ExhaustAll added in v0.2.0

func ExhaustAll[T any](observable Observable[Observable[T]]) Observable[T]

func ExhaustMap added in v0.2.0

func ExhaustMap[T, U any](observable Observable[T], project func(T) Observable[U]) Observable[U]

func From

func From[T any](slice ...T) Observable[T]

func Interval

func Interval[T Integer | Float](interval time.Duration) Observable[T]

func Map added in v0.2.0

func Map[T, U any](observable Observable[T], project func(T) U) Observable[U]

func MapE added in v0.2.0

func MapE[T, U any](observable Observable[T], project func(T) (U, error)) Observable[U]

func Merge

func Merge[T any](observables ...Observable[T]) Observable[T]

func MergeAll added in v0.2.0

func MergeAll[T any](observable Observable[Observable[T]]) Observable[T]

func MergeMap added in v0.2.0

func MergeMap[T, U any](observable Observable[T], project func(T) Observable[U]) Observable[U]

func Never

func Never[T any]() Observable[T]

func Of

func Of[T any](value T) Observable[T]

func Pull added in v0.2.2

func Pull[T any](seq iter.Seq[T]) Observable[T]

func Pull2 added in v0.2.2

func Pull2[T, U any](seq iter.Seq2[T, U]) Observable[Tuple2[T, U]]

func Race added in v0.2.0

func Race[T any](observables ...Observable[T]) Observable[T]

func Recv added in v0.2.0

func Recv[T any](ch <-chan T) Observable[T]

func Reduce added in v0.2.0

func Reduce[T, U any](observable Observable[T], seed U, accumulator func(acc U, next T) U) Observable[U]

func ReduceE added in v0.2.1

func ReduceE[T, U any](observable Observable[T], seed U, accumulator func(acc U, next T) (U, error)) Observable[U]

func Scan added in v0.2.0

func Scan[T, U any](observable Observable[T], seed U, accumulator func(acc U, next T) U) Observable[U]

func ScanE added in v0.2.0

func ScanE[T, U any](observable Observable[T], seed U, accumulator func(acc U, next T) (U, error)) Observable[U]

func SwitchAll added in v0.2.0

func SwitchAll[T any](observable Observable[Observable[T]]) Observable[T]

func SwitchMap added in v0.2.0

func SwitchMap[T, U any](o Observable[T], project func(T) Observable[U]) Observable[U]

func Throw

func Throw[T any](err error) Observable[T]

func Ticker

func Ticker(initialDelay time.Duration, intervals ...time.Duration) Observable[time.Time]

Ticker creates an ObservableTime that emits a sequence of timestamps after an initialDelay has passed. Subsequent timestamps are emitted using a schedule of intervals passed in. If only the initialDelay is given, Ticker will emit only once.

func Timer

func Timer[T Integer | Float](initialDelay time.Duration, intervals ...time.Duration) Observable[T]

func WithLatestFrom added in v0.2.0

func WithLatestFrom[T any](observables ...Observable[T]) Observable[[]T]

func WithLatestFrom2 added in v0.2.1

func WithLatestFrom2[T, U any](first Observable[T], second Observable[U]) Observable[Tuple2[T, U]]

func WithLatestFrom3 added in v0.2.1

func WithLatestFrom3[T, U, V any](first Observable[T], second Observable[U], third Observable[V]) Observable[Tuple3[T, U, V]]

func WithLatestFrom4 added in v0.2.1

func WithLatestFrom4[T, U, V, W any](first Observable[T], second Observable[U], third Observable[V], fourth Observable[W]) Observable[Tuple4[T, U, V, W]]

func WithLatestFrom5 added in v0.2.1

func WithLatestFrom5[T, U, V, W, X any](first Observable[T], second Observable[U], third Observable[V], fourth Observable[W], fifth Observable[X]) Observable[Tuple5[T, U, V, W, X]]

func WithLatestFromAll added in v0.2.0

func WithLatestFromAll[T any](observable Observable[Observable[T]]) Observable[[]T]

func Zip added in v0.2.1

func Zip[T any](observables ...Observable[T]) Observable[[]T]

func Zip2 added in v0.2.1

func Zip2[T, U any](first Observable[T], second Observable[U], options ...MaxBufferSizeOption) Observable[Tuple2[T, U]]

func Zip3 added in v0.2.1

func Zip3[T, U, V any](first Observable[T], second Observable[U], third Observable[V], options ...MaxBufferSizeOption) Observable[Tuple3[T, U, V]]

func Zip4 added in v0.2.1

func Zip4[T, U, V, W any](first Observable[T], second Observable[U], third Observable[V], fourth Observable[W], options ...MaxBufferSizeOption) Observable[Tuple4[T, U, V, W]]

func Zip5 added in v0.2.1

func Zip5[T, U, V, W, X any](first Observable[T], second Observable[U], third Observable[V], fourth Observable[W], fifth Observable[X], options ...MaxBufferSizeOption) Observable[Tuple5[T, U, V, W, X]]

func ZipAll added in v0.2.1

func ZipAll[T any](observable Observable[Observable[T]], options ...MaxBufferSizeOption) Observable[[]T]

func (Observable[T]) All

func (observable Observable[T]) All(scheduler ...Scheduler) iter.Seq2[int, T]

All converts an Observable stream into an iterator sequence that pairs each element with its index. It returns an iter.Seq2[int, T] which yields each element along with its position in the sequence. The scheduler parameter is optional and determines the execution context. Note: This method ignores any errors from the observable stream.

func (Observable[T]) Append added in v0.2.1

func (observable Observable[T]) Append(slice *[]T) Observable[T]

Append is a method variant of the Append function that appends each emitted value to the provided slice while forwarding all emissions to downstream operators. This is a convenience method that calls the standalone Append function.

func (Observable[T]) AsObservable

func (observable Observable[T]) AsObservable() Observable[any]

func (Observable[T]) Assign added in v0.2.0

func (observable Observable[T]) Assign(value *T) Observable[T]

Assign is a method version of the Assign function. It assigns every next value that is not done to the provided variable.

func (Observable[T]) AutoUnsubscribe

func (observable Observable[T]) AutoUnsubscribe() Observable[T]

func (Observable[T]) Catch

func (observable Observable[T]) Catch(other Observable[T]) Observable[T]

func (Observable[T]) CatchError

func (observable Observable[T]) CatchError(selector func(err error, caught Observable[T]) Observable[T]) Observable[T]

func (Observable[T]) ConcatWith

func (observable Observable[T]) ConcatWith(others ...Observable[T]) Observable[T]

func (Observable[T]) Count

func (observable Observable[T]) Count() Observable[int]

func (Observable[T]) Delay

func (observable Observable[T]) Delay(duration time.Duration) Observable[T]

func (Observable[T]) DistinctUntilChanged

func (observable Observable[T]) DistinctUntilChanged(equal func(T, T) bool) Observable[T]

func (Observable[T]) Do

func (observable Observable[T]) Do(f func(T)) Observable[T]

func (Observable[T]) ElementAt

func (observable Observable[T]) ElementAt(n int) Observable[T]

func (Observable[T]) EndWith added in v0.2.2

func (observable Observable[T]) EndWith(values ...T) Observable[T]

func (Observable[T]) Filter

func (observable Observable[T]) Filter(predicate func(T) bool) Observable[T]

func (Observable[T]) First

func (observable Observable[T]) First(schedulers ...Scheduler) (value T, err error)

func (Observable[T]) Fprint added in v0.2.0

func (observable Observable[T]) Fprint(out io.Writer) Observable[T]

func (Observable[T]) Fprintf added in v0.2.0

func (observable Observable[T]) Fprintf(out io.Writer, format string) Observable[T]

func (Observable[T]) Fprintln added in v0.2.0

func (observable Observable[T]) Fprintln(out io.Writer) Observable[T]

func (Observable[T]) Go added in v0.2.0

func (observable Observable[T]) Go(schedulers ...Scheduler) Subscription

Go subscribes to the observable and starts execution on a separate goroutine. It ignores all emissions from the observable sequence, making it useful when you only care about side effects and not the actual values. By default, it uses the Goroutine scheduler, but an optional scheduler can be provided. Returns a Subscription that can be used to cancel the subscription when no longer needed.

func (Observable[T]) Last

func (observable Observable[T]) Last(schedulers ...Scheduler) (value T, err error)

func (Observable[T]) Map

func (observable Observable[T]) Map(project func(T) any) Observable[any]

func (Observable[T]) MapE added in v0.2.1

func (observable Observable[T]) MapE(project func(T) (any, error)) Observable[any]

func (Observable[T]) Marshal added in v0.2.0

func (observable Observable[T]) Marshal(marshal func(any) ([]byte, error)) Observable[[]byte]

func (Observable[T]) MergeWith

func (observable Observable[T]) MergeWith(others ...Observable[T]) Observable[T]

func (Observable[T]) OnComplete added in v0.2.2

func (observable Observable[T]) OnComplete(f func()) Observable[T]

func (Observable[T]) OnDone added in v0.2.2

func (observable Observable[T]) OnDone(f func(error)) Observable[T]

func (Observable[T]) OnError added in v0.2.2

func (observable Observable[T]) OnError(f func(error)) Observable[T]

func (Observable[T]) OnNext added in v0.2.2

func (observable Observable[T]) OnNext(f func(T)) Observable[T]

func (Observable[T]) Passthrough added in v0.2.0

func (observable Observable[T]) Passthrough() Observable[T]

func (Observable[T]) Pipe added in v0.2.0

func (observable Observable[T]) Pipe(segments ...Pipe[T]) Observable[T]

func (Observable[T]) Print added in v0.2.0

func (observable Observable[T]) Print() Observable[T]

func (Observable[T]) Printf added in v0.2.0

func (observable Observable[T]) Printf(format string) Observable[T]

func (Observable[T]) Println

func (observable Observable[T]) Println() Observable[T]

func (Observable[T]) Publish

func (observable Observable[T]) Publish() Connectable[T]

Publish returns a multicasting Observable[T] for an underlying Observable[T] as a Connectable[T] type.

func (Observable[T]) RaceWith added in v0.2.0

func (observable Observable[T]) RaceWith(others ...Observable[T]) Observable[T]

func (Observable[T]) Repeat

func (observable Observable[T]) Repeat(count ...int) Observable[T]

Repeat emits the items emitted by the source Observable repeatedly.

Parameters:

  • count: Optional. The number of repetitions:
  • If omitted: The source Observable is repeated indefinitely
  • If 0: Returns an empty Observable
  • If negative: Returns an Observable that emits an error
  • If multiple count values: Returns an Observable that emits an error

The resulting Observable will subscribe to the source Observable repeatedly each time the source completes, up to the specified count.

func (Observable[T]) Retry

func (observable Observable[T]) Retry(limit ...int) Observable[T]

func (Observable[T]) RetryTime added in v0.2.0

func (observable Observable[T]) RetryTime(backoff func(int) time.Duration, limit ...int) Observable[T]

func (Observable[T]) SampleTime

func (observable Observable[T]) SampleTime(window time.Duration) Observable[T]

SampleTime emits the most recent item emitted by an Observable within periodic time intervals.

func (Observable[T]) Send added in v0.2.0

func (observable Observable[T]) Send(ch chan<- T) Observable[T]

func (Observable[T]) Share added in v0.2.0

func (observable Observable[T]) Share() Observable[T]

Share returns a new Observable that multicasts (shares) the original Observable. As long as there is at least one Subscriber this Observable will be subscribed and emitting data. When all subscribers have unsubscribed it will unsubscribe from the source Observable. Because the Observable is multicasting it makes the stream hot.

This method is useful when you have an Observable that is expensive to create or has side-effects, but you want to share the results of that Observable with multiple subscribers. By using `Share`, you can avoid creating multiple instances of the Observable and ensure that all subscribers receive the same data.

func (Observable[T]) Skip

func (observable Observable[T]) Skip(n int) Observable[T]

func (Observable[T]) Slice added in v0.2.0

func (observable Observable[T]) Slice(schedulers ...Scheduler) (slice []T, err error)

func (Observable[T]) StartWith

func (observable Observable[T]) StartWith(values ...T) Observable[T]

func (Observable[T]) Subscribe

func (observable Observable[T]) Subscribe(observe Observer[T], scheduler Scheduler) Subscription

func (Observable[T]) SubscribeOn

func (observable Observable[T]) SubscribeOn(scheduler ConcurrentScheduler) Observable[T]

func (Observable[T]) Take

func (observable Observable[T]) Take(n int) Observable[T]

Take returns an Observable that emits only the first count values emitted by the source Observable. If the source emits fewer than count values then all of its values are emitted. After that, it completes, regardless if the source completes.

func (Observable[T]) TakeWhile

func (observable Observable[T]) TakeWhile(condition func(T) bool) Observable[T]

func (Observable[T]) Tap added in v0.2.0

func (observable Observable[T]) Tap(tap Observer[T]) Observable[T]

func (Observable[T]) Values added in v0.2.0

func (observable Observable[T]) Values(scheduler ...Scheduler) iter.Seq[T]

func (Observable[T]) Wait

func (observable Observable[T]) Wait(schedulers ...Scheduler) error

type Observer

type Observer[T any] func(next T, err error, done bool)

Observer[T] represents a consumer of values delivered by an Observable. It is implemented as a function that takes three parameters: - next: the next value emitted by the Observable - err: any error that occurred during emission (nil if no error) - done: a boolean indicating whether the Observable has completed

Observers follow the reactive pattern by receiving a stream of events (values, errors, or completion signals) and reacting to them accordingly.

func AsObserver added in v0.2.0

func AsObserver[T any](observe Observer[any]) Observer[T]

AsObserver converts an Observer of type `any` to an Observer of a specific type T. This allows adapting a generic Observer to a more specific type context.

func Ignore added in v0.2.0

func Ignore[T any]() Observer[T]

Ignore creates an Observer that simply discards any emissions from an Observable. It is useful when you need to create an Observer but don't care about its values.

func (Observer[T]) AsObserver

func (observe Observer[T]) AsObserver() Observer[any]

AsObserver converts a typed Observer[T] to a generic Observer[any]. It handles type conversion from 'any' back to T, and will emit an ErrTypecastFailed error when conversion fails.

func (Observer[T]) Done added in v0.2.2

func (observe Observer[T]) Done(err error)

Done signals that the Observable has completed emitting values, optionally with an error. If err is nil, it indicates normal completion. If err is non-nil, it indicates that the Observable terminated with an error.

After Done is called, the Observable will not emit any more values, regardless of whether the completion was successful or due to an error.

func (Observer[T]) Next

func (observe Observer[T]) Next(next T)

Next sends a new value to the Observer. This is a convenience method that handles the common case of emitting a new value without errors or completion signals.

type Pipe added in v0.2.0

type Pipe[T any] func(Observable[T]) Observable[T]

func Append added in v0.2.1

func Append[T any](slice *[]T) Pipe[T]

Append creates a pipe that appends each emitted value to the provided slice. It passes each value through to the next observer after appending it. This allows collecting all emitted values in a slice while still forwarding them. Only values emitted before completion (done=false) are appended.

func Assign added in v0.2.0

func Assign[T any](value *T) Pipe[T]

Assign creates a pipe that assigns every next value that is not done to the provided variable. The pipe will forward all events (next, err, done) to the next observer.

func AutoUnsubscribe added in v0.2.2

func AutoUnsubscribe[T any]() Pipe[T]

func Catch added in v0.2.0

func Catch[T any](other Observable[T]) Pipe[T]

func CatchError added in v0.2.0

func CatchError[T any](selector func(err error, caught Observable[T]) Observable[T]) Pipe[T]

func ConcatWith added in v0.2.1

func ConcatWith[T any](others ...Observable[T]) Pipe[T]

func Delay added in v0.2.1

func Delay[T any](duration time.Duration) Pipe[T]

func DistinctUntilChanged added in v0.2.0

func DistinctUntilChanged[T any](equal func(T, T) bool) Pipe[T]

func Do added in v0.2.0

func Do[T any](do func(T)) Pipe[T]

func ElementAt added in v0.2.1

func ElementAt[T any](n int) Pipe[T]

func EndWith added in v0.2.2

func EndWith[T any](values ...T) Pipe[T]

func Filter added in v0.2.0

func Filter[T any](predicate func(T) bool) Pipe[T]

func Fprint added in v0.2.0

func Fprint[T any](out io.Writer) Pipe[T]

func Fprintf added in v0.2.0

func Fprintf[T any](out io.Writer, format string) Pipe[T]

func Fprintln added in v0.2.0

func Fprintln[T any](out io.Writer) Pipe[T]

func MergeWith added in v0.2.1

func MergeWith[T any](others ...Observable[T]) Pipe[T]

func OnComplete added in v0.2.1

func OnComplete[T any](onComplete func()) Pipe[T]

func OnDone added in v0.2.1

func OnDone[T any](onDone func(error)) Pipe[T]

func OnError added in v0.2.1

func OnError[T any](onError func(error)) Pipe[T]

func OnNext added in v0.2.1

func OnNext[T any](onNext func(T)) Pipe[T]

func Passthrough added in v0.2.0

func Passthrough[T any]() Pipe[T]

func Print added in v0.2.0

func Print[T any]() Pipe[T]

func Printf added in v0.2.0

func Printf[T any](format string) Pipe[T]

func Println

func Println[T any]() Pipe[T]

func RaceWith added in v0.2.1

func RaceWith[T any](others ...Observable[T]) Pipe[T]

func Repeat added in v0.2.2

func Repeat[T any](count ...int) Pipe[T]

Repeat creates an Observable that emits the entire source sequence multiple times.

Parameters:

  • count: Optional. The number of repetitions:
  • If omitted: The source Observable is repeated indefinitely
  • If 0: Returns an empty Observable
  • If negative: Returns an Observable that emits an error
  • If multiple count values: Returns an Observable that emits an error

The resulting Observable will subscribe to the source Observable repeatedly each time the source completes, up to the specified count.

func Retry added in v0.2.2

func Retry[T any](limit ...int) Pipe[T]

func Send added in v0.2.0

func Send[T any](ch chan<- T) Pipe[T]

func Skip added in v0.2.0

func Skip[T any](n int) Pipe[T]

func StartWith added in v0.2.2

func StartWith[T any](values ...T) Pipe[T]

func Take added in v0.2.0

func Take[T any](n int) Pipe[T]

Take returns an Observable that emits only the first count values emitted by the source Observable. If the source emits fewer than count values then all of its values are emitted. After that, it completes, regardless if the source completes.

func TakeWhile added in v0.2.0

func TakeWhile[T any](condition func(T) bool) Pipe[T]

func Tap added in v0.2.0

func Tap[T any](tap Observer[T]) Pipe[T]

type Scheduler

type Scheduler = scheduler.Scheduler

type Signed added in v0.2.2

type Signed interface {
	~int | ~int8 | ~int16 | ~int32 | ~int64
}

Signed is a constraint that permits any signed integer type. If future releases of Go add new predeclared signed integer types, this constraint will be modified to include them.

type Subscriber

type Subscriber interface {
	// Subscribed returns true if the subscriber is in a subscribed state.
	// Returns false once Unsubscribe has been called.
	Subscribed() bool

	// Unsubscribe changes the state to unsubscribed and executes all registered
	// callback functions. Does nothing if already unsubscribed.
	Unsubscribe()

	// Add creates and returns a new child Subscriber.
	// If the parent is already unsubscribed, the child will be created in an
	// unsubscribed state. Otherwise, the child will be unsubscribed when the parent
	// is unsubscribed.
	Add() Subscriber

	// OnUnsubscribe registers a callback function to be executed when Unsubscribe is called.
	// If the subscriber is already unsubscribed, the callback is executed immediately.
	// If callback is nil, this method does nothing.
	OnUnsubscribe(callback func())
}

Subscriber is a subscribable entity that allows construction of a Subscriber tree.

type Subscription

type Subscription interface {
	// Subscribed returns true until Unsubscribe is called.
	Subscribed() bool

	// Unsubscribe will change the state to unsubscribed.
	Unsubscribe()

	// Done returns a channel that is closed when the subscription state changes to unsubscribed.
	// This channel can be used with select statements to react to subscription termination events.
	// If the scheduler is not concurrent, it will spawn a goroutine to wait for the scheduler.
	Done() <-chan struct{}

	// Err returns the subscription's terminal state:
	// - nil if the observable completed successfully
	// - the observable's error if it terminated with an error
	// - SubscriptionCanceled if the subscription was manually unsubscribed
	// - SubscriptionActive if the subscription is still active
	Err() error

	// Wait blocks until the subscription state becomes unsubscribed.
	// If the subscription is already unsubscribed, it returns immediately.
	// If the scheduler is not concurrent, it will wait for the scheduler to complete.
	// Returns:
	// - nil if the observable completed successfully
	// - the observable's error if it terminated with an error
	// - SubscriptionCanceled if the subscription was manually unsubscribed
	Wait() error
}

Subscription is an interface that allows monitoring and controlling a subscription. It provides methods for tracking the subscription's lifecycle.

type Tuple2 added in v0.2.1

type Tuple2[T, U any] struct {
	First  T
	Second U
}

type Tuple3 added in v0.2.1

type Tuple3[T, U, V any] struct {
	First  T
	Second U
	Third  V
}

type Tuple4 added in v0.2.1

type Tuple4[T, U, V, W any] struct {
	First  T
	Second U
	Third  V
	Fourth W
}

type Tuple5 added in v0.2.1

type Tuple5[T, U, V, W, X any] struct {
	First  T
	Second U
	Third  V
	Fourth W
	Fifth  X
}

type Unsigned added in v0.2.2

type Unsigned interface {
	~uint | ~uint8 | ~uint16 | ~uint32 | ~uint64 | ~uintptr
}

Unsigned is a constraint that permits any unsigned integer type. If future releases of Go add new predeclared unsigned integer types, this constraint will be modified to include them.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL