Documentation
¶
Overview ¶
Package rx provides Reactive Extensions, a powerful API for asynchronous programming in Go, built around observables and operators to process streams of data seamlessly.
Example (All) ¶
package main
import (
"fmt"
"github.com/reactivego/rx"
)
func main() {
source := rx.From("ZERO", "ONE", "TWO")
for k, v := range source.All() {
fmt.Println(k, v)
}
fmt.Println("OK")
}
Output: 0 ZERO 1 ONE 2 TWO OK
Example (AutoConnect) ¶
package main
import (
"fmt"
"time"
"github.com/reactivego/rx"
)
func main() {
// Create a multicaster hot observable that will emit every 100 milliseconds
hot := rx.Interval[int](100 * time.Millisecond).Take(10).Publish()
hotsub := hot.Connect(rx.Goroutine)
defer hotsub.Unsubscribe()
fmt.Println("Hot observable created and emitting 0,1,2,3,4,5,6 ...")
// Publish the hot observable again but only Connect to it when 2
// subscribers have connected.
source := hot.Take(5).Publish().AutoConnect(2)
// First subscriber
sub1 := source.Printf("Subscriber 1: %d\n").Go()
fmt.Println("First subscriber connected, waiting a bit...")
// Wait a bit, nothing will emit yet
time.Sleep(525 * time.Millisecond)
fmt.Println("Second subscriber connecting, emissions begin!")
// Second subscriber triggers the connection
sub2 := source.Printf("Subscriber 2: %d\n").Go()
// Wait for emissions to complete
hotsub.Wait()
sub1.Wait()
sub2.Wait()
}
Output: Hot observable created and emitting 0,1,2,3,4,5,6 ... First subscriber connected, waiting a bit... Second subscriber connecting, emissions begin! Subscriber 1: 5 Subscriber 2: 5 Subscriber 1: 6 Subscriber 2: 6 Subscriber 1: 7 Subscriber 2: 7 Subscriber 1: 8 Subscriber 2: 8 Subscriber 1: 9 Subscriber 2: 9
Example (BufferCount) ¶
package main
import (
"fmt"
"github.com/reactivego/rx"
)
func main() {
source := rx.From(0, 1, 2, 3)
fmt.Println("BufferCount(From(0, 1, 2, 3), 2, 1)")
rx.BufferCount(source, 2, 1).Println().Wait()
fmt.Println("BufferCount(From(0, 1, 2, 3), 2, 2)")
rx.BufferCount(source, 2, 2).Println().Wait()
fmt.Println("BufferCount(From(0, 1, 2, 3), 2, 3)")
rx.BufferCount(source, 2, 3).Println().Wait()
fmt.Println("BufferCount(From(0, 1, 2, 3), 3, 2)")
rx.BufferCount(source, 3, 2).Println().Wait()
fmt.Println("BufferCount(From(0, 1, 2, 3), 6, 6)")
rx.BufferCount(source, 6, 6).Println().Wait()
fmt.Println("BufferCount(From(0, 1, 2, 3), 2, 0)")
rx.BufferCount(source, 2, 0).Println().Wait()
}
Output: BufferCount(From(0, 1, 2, 3), 2, 1) [0 1] [1 2] [2 3] [3] BufferCount(From(0, 1, 2, 3), 2, 2) [0 1] [2 3] BufferCount(From(0, 1, 2, 3), 2, 3) [0 1] [3] BufferCount(From(0, 1, 2, 3), 3, 2) [0 1 2] [2 3] BufferCount(From(0, 1, 2, 3), 6, 6) [0 1 2 3] BufferCount(From(0, 1, 2, 3), 2, 0) [0 1]
Example (ConcatAll) ¶
package main
import (
"fmt"
"time"
"github.com/reactivego/rx"
)
func main() {
source := rx.Empty[rx.Observable[string]]()
rx.ConcatAll(source).Wait()
source = rx.Of(rx.Empty[string]())
rx.ConcatAll(source).Wait()
req := func(request string, duration time.Duration) rx.Observable[string] {
req := rx.From(request + " response")
if duration == 0 {
return req
}
return req.Delay(duration)
}
const ms = time.Millisecond
req1 := req("first", 10*ms)
req2 := req("second", 20*ms)
req3 := req("third", 0*ms)
req4 := req("fourth", 60*ms)
source = rx.From(req1).ConcatWith(rx.From(req2, req3, req4).Delay(100 * ms))
rx.ConcatAll(source).Println().Wait()
fmt.Println("OK")
}
Output: first response second response third response fourth response OK
Example (Count) ¶
package main
import (
"fmt"
"github.com/reactivego/rx"
)
func main() {
source := rx.From(1, 2, 3, 4, 5)
count := source.Count()
count.Println().Wait()
emptySource := rx.Empty[int]()
emptyCount := emptySource.Count()
emptyCount.Println().Wait()
fmt.Println("OK")
}
Output: 5 0 OK
Example (ElementAt) ¶
package main
import (
"github.com/reactivego/rx"
)
func main() {
rx.From(0, 1, 2, 3, 4).ElementAt(2).Println().Wait()
}
Output: 2
Example (ExhaustAll) ¶
package main
import (
"fmt"
"strconv"
"time"
"github.com/reactivego/rx"
)
func main() {
const ms = time.Millisecond
stream := func(name string, duration time.Duration, count int) rx.Observable[string] {
return rx.Map(rx.Timer[int](0*ms, duration), func(next int) string {
return name + "-" + strconv.Itoa(next)
}).Take(count)
}
streams := []rx.Observable[string]{
stream("a", 20*ms, 3),
stream("b", 20*ms, 3),
stream("c", 20*ms, 3),
rx.Empty[string](),
}
streamofstreams := rx.Map(rx.Timer[int](20*ms, 30*ms, 250*ms, 100*ms).Take(4), func(next int) rx.Observable[string] {
return streams[next]
})
err := rx.ExhaustAll(streamofstreams).Println().Wait()
if err == nil {
fmt.Println("success")
}
}
Output: a-0 a-1 a-2 c-0 c-1 c-2 success
Example (Marshal) ¶
package main
import (
"encoding/json"
"github.com/reactivego/rx"
)
func main() {
type R struct {
A string `json:"a"`
B string `json:"b"`
}
b2s := func(data []byte) string { return string(data) }
rx.Map(rx.Of(R{"Hello", "World"}).Marshal(json.Marshal), b2s).Println().Wait()
}
Output: {"a":"Hello","b":"World"}
Example (MergeMap) ¶
package main
import (
"fmt"
"github.com/reactivego/rx"
)
func main() {
source := rx.From("https://reactivego.io", "https://github.com/reactivego")
merged := rx.MergeMap(source, func(next string) rx.Observable[string] {
fakeFetchData := rx.Of(fmt.Sprintf("content of %q", next))
return fakeFetchData
})
merged.Println().Go().Wait()
}
Output: content of "https://reactivego.io" content of "https://github.com/reactivego"
Example (MergeMapSubject) ¶
package main
import (
"fmt"
"time"
"github.com/reactivego/rx"
)
func main() {
source := rx.From("https://google.com", "https://reactivego.io", "https://github.com/reactivego")
merged := rx.MergeMap(source, func(next string) rx.Observable[string] {
fakeFetchData := rx.Of(fmt.Sprintf("content of %q", next))
return fakeFetchData
})
// subject remembers last 2 emits by the observer for an hour.
observer, subject := rx.Subject[string](time.Hour, 2)
merged.Tap(observer).Go()
// Sees all emits by the subject
subject.Println().Go().Wait()
// Sees only last 2 emits
subject.Println().Go().Wait()
// Sees only last 2 emits
subject.Println().Go().Wait()
}
Output: content of "https://google.com" content of "https://reactivego.io" content of "https://github.com/reactivego" content of "https://reactivego.io" content of "https://github.com/reactivego" content of "https://reactivego.io" content of "https://github.com/reactivego"
Example (Multicast) ¶
package main
import (
"fmt"
"github.com/reactivego/rx"
)
func main() {
serial := rx.NewScheduler()
in, out := rx.Multicast[int](1)
// Ignore everything before any subscriptions, including the last!
in.Next(-2)
in.Next(-1)
in.Next(0)
in.Next(1)
// Schedule the subsequent emits in a loop. This will be the first task to
// run on the serial scheduler after the subscriptions have been added.
serial.ScheduleLoop(2, func(index int, again func(next int)) {
if index < 4 {
in.Next(index)
again(index + 1)
} else {
in.Done(rx.Err)
}
})
// Add a couple of subscriptions
sub1 := out.Println().Go(serial)
sub2 := out.Println().Go(serial)
// Let the scheduler run and wait for all of its scheduled tasks to finish.
serial.Wait()
fmt.Println(sub1.Wait())
fmt.Println(sub2.Wait())
}
Output: 2 2 3 3 rx rx
Example (MulticastDrop) ¶
package main
import (
"fmt"
"github.com/reactivego/rx"
)
func main() {
serial := rx.NewScheduler()
const onBackpressureDrop = -1
// multicast with backpressure handling set to dropping incoming
// items that don't fit in the buffer once it has filled up.
in, out := rx.Multicast[int](1 * onBackpressureDrop)
// ignore everything before any subscriptions, including the last!
in.Next(-2)
in.Next(-1)
in.Next(0)
in.Next(1)
// add a couple of subscriptions
sub1 := out.Println().Go(serial)
sub2 := out.Println().Go(serial)
in.Next(2) // accepted: buffer not full
in.Next(3) // dropped: buffer full
in.Done(rx.Err) // dropped: buffer full
serial.Wait()
fmt.Println(sub1.Wait())
fmt.Println(sub2.Wait())
}
Output: 2 2 <nil> <nil>
Example (Race) ¶
package main
import (
"errors"
"fmt"
"time"
"github.com/reactivego/rx"
)
func main() {
const ms = time.Millisecond
req := func(request string, duration time.Duration) rx.Observable[string] {
return rx.From(request + " response").Delay(duration)
}
req1 := req("first", 50*ms)
req2 := req("second", 10*ms)
req3 := req("third", 60*ms)
rx.Race(req1, req2, req3).Println().Wait()
err := func(text string, duration time.Duration) rx.Observable[int] {
return rx.Throw[int](errors.New(text + " error")).Delay(duration)
}
err1 := err("first", 10*ms)
err2 := err("second", 20*ms)
err3 := err("third", 30*ms)
fmt.Println(rx.Race(err1, err2, err3).Wait(rx.Goroutine))
}
Output: second response first error
Example (Retry) ¶
package main
import (
"fmt"
"github.com/reactivego/rx"
)
func main() {
var first error = rx.Err
a := rx.Create(func(index int) (next int, err error, done bool) {
if index < 3 {
return index, nil, false
}
err, first = first, nil
return 0, err, true
})
err := a.Retry().Println().Wait()
fmt.Println(first == nil)
fmt.Println(err)
}
Output: 0 1 2 0 1 2 true <nil>
Example (Skip) ¶
package main
import (
"github.com/reactivego/rx"
)
func main() {
rx.From(1, 2, 3, 4, 5).Skip(2).Println().Wait()
}
Output: 3 4 5
Example (Subject) ¶
package main
import (
"fmt"
"github.com/reactivego/rx"
)
func main() {
serial := rx.NewScheduler()
// subject collects emits when there are no subscriptions active.
in, out := rx.Subject[int](0, 1)
// ignore everything before any subscriptions, except the last because buffer size is 1
in.Next(-2)
in.Next(-1)
in.Next(0)
in.Next(1)
// add a couple of subscriptions
sub1 := out.Println().Go(serial)
sub2 := out.Println().Go(serial)
// schedule the subsequent emits on the serial scheduler otherwise these calls
// will block because the buffer is full.
// subject will detect usage of scheduler on observable side and use it on the
// observer side to keep the data flow through the subject going.
serial.Schedule(func() {
in.Next(2)
in.Next(3)
in.Done(rx.Err)
})
serial.Wait()
fmt.Println(sub1.Wait())
fmt.Println(sub2.Wait())
}
Output: 1 1 2 2 3 3 rx rx
Example (SwitchAll) ¶
package main
import (
"fmt"
"time"
"github.com/reactivego/rx"
)
func main() {
const ms = time.Millisecond
interval42x4 := rx.Interval[int](42 * ms).Take(4)
interval16x4 := rx.Interval[int](16 * ms).Take(4)
err := rx.SwitchAll(rx.Map(interval42x4, func(next int) rx.Observable[int] { return interval16x4 })).Println().Wait(rx.Goroutine)
if err == nil {
fmt.Println("success")
}
}
Output: 0 1 0 1 0 1 0 1 2 3 success
Example (SwitchMap) ¶
package main
import (
"fmt"
"time"
"github.com/reactivego/rx"
)
func main() {
const ms = time.Millisecond
webreq := func(request string, duration time.Duration) rx.Observable[string] {
return rx.From(request + " result").Delay(duration)
}
first := webreq("first", 50*ms)
second := webreq("second", 10*ms)
latest := webreq("latest", 50*ms)
switchmap := rx.SwitchMap(rx.Interval[int](20*ms).Take(3), func(i int) rx.Observable[string] {
switch i {
case 0:
return first
case 1:
return second
case 2:
return latest
default:
return rx.Empty[string]()
}
})
err := switchmap.Println().Wait()
if err == nil {
fmt.Println("success")
}
}
Output: second result latest result success
Example (Values) ¶
package main
import (
"fmt"
"github.com/reactivego/rx"
"github.com/reactivego/scheduler"
)
func main() {
source := rx.From(1, 3, 5)
// Why choose the Goroutine concurrent scheduler?
// An observable can actually be at the root of a tree
// of separately running observables that have their
// responses merged. The Goroutine scheduler allows
// these observables to run concurrently.
// run the observable on 1 or more goroutines
for i := range source.Values(scheduler.Goroutine) {
// This is called from a newly created goroutine
fmt.Println(i)
}
// run the observable on the current goroutine
for i := range source.Values(scheduler.New()) {
fmt.Println(i)
}
fmt.Println("OK")
}
Output: 1 3 5 1 3 5 OK
Index ¶
- Variables
- func All2[T, U any](observable Observable[Tuple2[T, U]], scheduler ...Scheduler) iter.Seq2[T, U]
- func Equal[T comparable]() func(T, T) bool
- func Multicast[T any](size int) (Observer[T], Observable[T])
- func Must[T any](t T, err error) T
- func Subject[T any](age time.Duration, capacity ...int) (Observer[T], Observable[T])
- type ConcurrentScheduler
- type Connectable
- type Connector
- type Creator
- type Float
- type Integer
- type MaxBufferSizeOption
- type Observable
- func AsObservable[T any](observable Observable[any]) Observable[T]
- func BufferCount[T any](observable Observable[T], bufferSize, startBufferEvery int) Observable[[]T]
- func CombineAll[T any](observable Observable[Observable[T]]) Observable[[]T]
- func CombineLatest[T any](observables ...Observable[T]) Observable[[]T]
- func CombineLatest2[T, U any](first Observable[T], second Observable[U]) Observable[Tuple2[T, U]]
- func CombineLatest3[T, U, V any](first Observable[T], second Observable[U], third Observable[V]) Observable[Tuple3[T, U, V]]
- func CombineLatest4[T, U, V, W any](first Observable[T], second Observable[U], third Observable[V], ...) Observable[Tuple4[T, U, V, W]]
- func CombineLatest5[T, U, V, W, X any](first Observable[T], second Observable[U], third Observable[V], ...) Observable[Tuple5[T, U, V, W, X]]
- func Concat[T any](observables ...Observable[T]) Observable[T]
- func ConcatAll[T any](observable Observable[Observable[T]]) Observable[T]
- func ConcatMap[T, U any](observable Observable[T], project func(T) Observable[U]) Observable[U]
- func Create[T any](create Creator[T]) Observable[T]
- func Defer[T any](factory func() Observable[T]) Observable[T]
- func Empty[T any]() Observable[T]
- func ExhaustAll[T any](observable Observable[Observable[T]]) Observable[T]
- func ExhaustMap[T, U any](observable Observable[T], project func(T) Observable[U]) Observable[U]
- func From[T any](slice ...T) Observable[T]
- func Interval[T Integer | Float](interval time.Duration) Observable[T]
- func Map[T, U any](observable Observable[T], project func(T) U) Observable[U]
- func MapE[T, U any](observable Observable[T], project func(T) (U, error)) Observable[U]
- func Merge[T any](observables ...Observable[T]) Observable[T]
- func MergeAll[T any](observable Observable[Observable[T]]) Observable[T]
- func MergeMap[T, U any](observable Observable[T], project func(T) Observable[U]) Observable[U]
- func Never[T any]() Observable[T]
- func Of[T any](value T) Observable[T]
- func Pull[T any](seq iter.Seq[T]) Observable[T]
- func Pull2[T, U any](seq iter.Seq2[T, U]) Observable[Tuple2[T, U]]
- func Race[T any](observables ...Observable[T]) Observable[T]
- func Recv[T any](ch <-chan T) Observable[T]
- func Reduce[T, U any](observable Observable[T], seed U, accumulator func(acc U, next T) U) Observable[U]
- func ReduceE[T, U any](observable Observable[T], seed U, accumulator func(acc U, next T) (U, error)) Observable[U]
- func Scan[T, U any](observable Observable[T], seed U, accumulator func(acc U, next T) U) Observable[U]
- func ScanE[T, U any](observable Observable[T], seed U, accumulator func(acc U, next T) (U, error)) Observable[U]
- func SwitchAll[T any](observable Observable[Observable[T]]) Observable[T]
- func SwitchMap[T, U any](o Observable[T], project func(T) Observable[U]) Observable[U]
- func Throw[T any](err error) Observable[T]
- func Ticker(initialDelay time.Duration, intervals ...time.Duration) Observable[time.Time]
- func Timer[T Integer | Float](initialDelay time.Duration, intervals ...time.Duration) Observable[T]
- func WithLatestFrom[T any](observables ...Observable[T]) Observable[[]T]
- func WithLatestFrom2[T, U any](first Observable[T], second Observable[U]) Observable[Tuple2[T, U]]
- func WithLatestFrom3[T, U, V any](first Observable[T], second Observable[U], third Observable[V]) Observable[Tuple3[T, U, V]]
- func WithLatestFrom4[T, U, V, W any](first Observable[T], second Observable[U], third Observable[V], ...) Observable[Tuple4[T, U, V, W]]
- func WithLatestFrom5[T, U, V, W, X any](first Observable[T], second Observable[U], third Observable[V], ...) Observable[Tuple5[T, U, V, W, X]]
- func WithLatestFromAll[T any](observable Observable[Observable[T]]) Observable[[]T]
- func Zip[T any](observables ...Observable[T]) Observable[[]T]
- func Zip2[T, U any](first Observable[T], second Observable[U], options ...MaxBufferSizeOption) Observable[Tuple2[T, U]]
- func Zip3[T, U, V any](first Observable[T], second Observable[U], third Observable[V], ...) Observable[Tuple3[T, U, V]]
- func Zip4[T, U, V, W any](first Observable[T], second Observable[U], third Observable[V], ...) Observable[Tuple4[T, U, V, W]]
- func Zip5[T, U, V, W, X any](first Observable[T], second Observable[U], third Observable[V], ...) Observable[Tuple5[T, U, V, W, X]]
- func ZipAll[T any](observable Observable[Observable[T]], options ...MaxBufferSizeOption) Observable[[]T]
- func (observable Observable[T]) All(scheduler ...Scheduler) iter.Seq2[int, T]
- func (observable Observable[T]) Append(slice *[]T) Observable[T]
- func (observable Observable[T]) AsObservable() Observable[any]
- func (observable Observable[T]) Assign(value *T) Observable[T]
- func (observable Observable[T]) AutoUnsubscribe() Observable[T]
- func (observable Observable[T]) Catch(other Observable[T]) Observable[T]
- func (observable Observable[T]) CatchError(selector func(err error, caught Observable[T]) Observable[T]) Observable[T]
- func (observable Observable[T]) ConcatWith(others ...Observable[T]) Observable[T]
- func (observable Observable[T]) Count() Observable[int]
- func (observable Observable[T]) Delay(duration time.Duration) Observable[T]
- func (observable Observable[T]) DistinctUntilChanged(equal func(T, T) bool) Observable[T]
- func (observable Observable[T]) Do(f func(T)) Observable[T]
- func (observable Observable[T]) ElementAt(n int) Observable[T]
- func (observable Observable[T]) EndWith(values ...T) Observable[T]
- func (observable Observable[T]) Filter(predicate func(T) bool) Observable[T]
- func (observable Observable[T]) First(schedulers ...Scheduler) (value T, err error)
- func (observable Observable[T]) Fprint(out io.Writer) Observable[T]
- func (observable Observable[T]) Fprintf(out io.Writer, format string) Observable[T]
- func (observable Observable[T]) Fprintln(out io.Writer) Observable[T]
- func (observable Observable[T]) Go(schedulers ...Scheduler) Subscription
- func (observable Observable[T]) Last(schedulers ...Scheduler) (value T, err error)
- func (observable Observable[T]) Map(project func(T) any) Observable[any]
- func (observable Observable[T]) MapE(project func(T) (any, error)) Observable[any]
- func (observable Observable[T]) Marshal(marshal func(any) ([]byte, error)) Observable[[]byte]
- func (observable Observable[T]) MergeWith(others ...Observable[T]) Observable[T]
- func (observable Observable[T]) OnComplete(f func()) Observable[T]
- func (observable Observable[T]) OnDone(f func(error)) Observable[T]
- func (observable Observable[T]) OnError(f func(error)) Observable[T]
- func (observable Observable[T]) OnNext(f func(T)) Observable[T]
- func (observable Observable[T]) Passthrough() Observable[T]
- func (observable Observable[T]) Pipe(segments ...Pipe[T]) Observable[T]
- func (observable Observable[T]) Print() Observable[T]
- func (observable Observable[T]) Printf(format string) Observable[T]
- func (observable Observable[T]) Println() Observable[T]
- func (observable Observable[T]) Publish() Connectable[T]
- func (observable Observable[T]) RaceWith(others ...Observable[T]) Observable[T]
- func (observable Observable[T]) Repeat(count ...int) Observable[T]
- func (observable Observable[T]) Retry(limit ...int) Observable[T]
- func (observable Observable[T]) RetryTime(backoff func(int) time.Duration, limit ...int) Observable[T]
- func (observable Observable[T]) SampleTime(window time.Duration) Observable[T]
- func (observable Observable[T]) Send(ch chan<- T) Observable[T]
- func (observable Observable[T]) Share() Observable[T]
- func (observable Observable[T]) Skip(n int) Observable[T]
- func (observable Observable[T]) Slice(schedulers ...Scheduler) (slice []T, err error)
- func (observable Observable[T]) StartWith(values ...T) Observable[T]
- func (observable Observable[T]) Subscribe(observe Observer[T], scheduler Scheduler) Subscription
- func (observable Observable[T]) SubscribeOn(scheduler ConcurrentScheduler) Observable[T]
- func (observable Observable[T]) Take(n int) Observable[T]
- func (observable Observable[T]) TakeWhile(condition func(T) bool) Observable[T]
- func (observable Observable[T]) Tap(tap Observer[T]) Observable[T]
- func (observable Observable[T]) Values(scheduler ...Scheduler) iter.Seq[T]
- func (observable Observable[T]) Wait(schedulers ...Scheduler) error
- type Observer
- type Pipe
- func Append[T any](slice *[]T) Pipe[T]
- func Assign[T any](value *T) Pipe[T]
- func AutoUnsubscribe[T any]() Pipe[T]
- func Catch[T any](other Observable[T]) Pipe[T]
- func CatchError[T any](selector func(err error, caught Observable[T]) Observable[T]) Pipe[T]
- func ConcatWith[T any](others ...Observable[T]) Pipe[T]
- func Delay[T any](duration time.Duration) Pipe[T]
- func DistinctUntilChanged[T any](equal func(T, T) bool) Pipe[T]
- func Do[T any](do func(T)) Pipe[T]
- func ElementAt[T any](n int) Pipe[T]
- func EndWith[T any](values ...T) Pipe[T]
- func Filter[T any](predicate func(T) bool) Pipe[T]
- func Fprint[T any](out io.Writer) Pipe[T]
- func Fprintf[T any](out io.Writer, format string) Pipe[T]
- func Fprintln[T any](out io.Writer) Pipe[T]
- func MergeWith[T any](others ...Observable[T]) Pipe[T]
- func OnComplete[T any](onComplete func()) Pipe[T]
- func OnDone[T any](onDone func(error)) Pipe[T]
- func OnError[T any](onError func(error)) Pipe[T]
- func OnNext[T any](onNext func(T)) Pipe[T]
- func Passthrough[T any]() Pipe[T]
- func Print[T any]() Pipe[T]
- func Printf[T any](format string) Pipe[T]
- func Println[T any]() Pipe[T]
- func RaceWith[T any](others ...Observable[T]) Pipe[T]
- func Repeat[T any](count ...int) Pipe[T]
- func Retry[T any](limit ...int) Pipe[T]
- func Send[T any](ch chan<- T) Pipe[T]
- func Skip[T any](n int) Pipe[T]
- func StartWith[T any](values ...T) Pipe[T]
- func Take[T any](n int) Pipe[T]
- func TakeWhile[T any](condition func(T) bool) Pipe[T]
- func Tap[T any](tap Observer[T]) Pipe[T]
- type Scheduler
- type Signed
- type Subscriber
- type Subscription
- type Tuple2
- type Tuple3
- type Tuple4
- type Tuple5
- type Unsigned
Examples ¶
- Package (All)
- Package (AutoConnect)
- Package (BufferCount)
- Package (ConcatAll)
- Package (Count)
- Package (ElementAt)
- Package (ExhaustAll)
- Package (Marshal)
- Package (MergeMap)
- Package (MergeMapSubject)
- Package (Multicast)
- Package (MulticastDrop)
- Package (Race)
- Package (Retry)
- Package (Share)
- Package (Skip)
- Package (Subject)
- Package (SwitchAll)
- Package (SwitchMap)
- Package (Values)
Constants ¶
This section is empty.
Variables ¶
var Err = errors.New("rx")
Err declares the base error that is joined with every error returned by this package. It serves as the foundation for error types in the reactive extensions library, allowing for error type checking and custom error creation with consistent taxonomy.
var ErrOutOfSubjectSubscriptions = errors.Join(Err, errors.New("out of subject subscriptions"))
ErrSubscriptionActive is the error returned by Err() when the subscription is still active and has not yet completed or been canceled.
ErrSubscriptionCanceled is the error returned by Wait() and Err() when the subscription was canceled by calling Unsubscribe() on the Subscription. This indicates the subscription was terminated by the subscriber rather than by the observable completing normally or with an error.
ErrTypecastFailed is returned when a type conversion fails during observer operations, typically when using AsObserver() to convert between generic and typed observers.
var Goroutine = scheduler.Goroutine
var NewScheduler = scheduler.New
Functions ¶
func All2 ¶ added in v0.2.2
All2 converts an Observable of Tuple2 pairs into an iterator sequence. It transforms an Observable[Tuple2[T, U]] into an iter.Seq2[T, U] that yields each tuple's components. The scheduler parameter is optional and determines the execution context. Note: This method ignores any errors from the observable stream.
func Equal ¶ added in v0.2.0
func Equal[T comparable]() func(T, T) bool
func Multicast ¶ added in v0.2.0
func Multicast[T any](size int) (Observer[T], Observable[T])
Multicast returns both an Observer and and Observable. The returned Observer is used to send items into the Multicast. The returned Observable is used to subscribe to the Multicast. The Multicast multicasts items send through the Observer to every Subscriber of the Observable.
size size of the item buffer, number of items kept to replay to a new Subscriber.
Backpressure handling depends on the sign of the size argument. For positive size the multicast will block when one of the subscribers lets the buffer fill up. For negative size the multicast will drop items on the blocking subscriber, allowing the others to keep on receiving values. For hot observables dropping is preferred.
func Subject ¶
Subject returns both an Observer and and Observable. The returned Observer is used to send items into the Subject. The returned Observable is used to subscribe to the Subject. The Subject multicasts items send through the Observer to every Subscriber of the Observable.
age max age to keep items in order to replay them to a new Subscriber (0 = no max age). [size] size of the item buffer, number of items kept to replay to a new Subscriber. [cap] capacity of the item buffer, number of items that can be observed before blocking. [scap] capacity of the subscription list, max number of simultaneous subscribers.
Types ¶
type ConcurrentScheduler ¶ added in v0.2.0
type ConcurrentScheduler = scheduler.ConcurrentScheduler
type Connectable ¶
type Connectable[T any] struct { Observable[T] Connector }
Connectable[T] is an Observable[T] that provides delayed connection to its source. It combines both Observable and Connector interfaces:
- Subscribe: Allows consumers to register for notifications from this Observable
- Connect: Triggers the actual subscription to the underlying source Observable
The key feature of Connectable[T] is that it doesn't subscribe to its source until the Connect method is explicitly called, allowing multiple observers to subscribe before the source begins emitting items (multicast behavior).
func (Connectable[T]) AutoConnect ¶ added in v0.2.0
func (connectable Connectable[T]) AutoConnect(count int) Observable[T]
AutoConnect returns an Observable that automatically connects to the Connectable source when a specified number of subscribers subscribe to it.
When the specified number of subscribers (count) is reached, the Connectable source is connected, allowing it to start emitting items. The connection is shared among all subscribers. When all subscribers unsubscribe, the connection is terminated.
If count is less than 1, it returns an Observable that emits an ErrInvalidCount error.
func (Connectable[T]) RefCount ¶ added in v0.2.0
func (connectable Connectable[T]) RefCount() Observable[T]
RefCount converts a Connectable Observable into a standard Observable that automatically connects when the first subscriber subscribes and disconnects when the last subscriber unsubscribes.
When the first subscriber subscribes to the resulting Observable, it automatically calls Connect() on the source Connectable Observable. The connection is shared among all subscribers. When the last subscriber unsubscribes, the connection is automatically closed.
This is useful for efficiently sharing expensive resources (like network connections) among multiple subscribers.
type Connector ¶ added in v0.2.3
type Connector func(Scheduler, Subscriber)
Connector provides the Connect method for a Connectable[T].
func (Connector) Connect ¶ added in v0.2.3
func (connect Connector) Connect(schedulers ...Scheduler) Subscription
Connect instructs a Connectable[T] to subscribe to its source and begin emitting items to its subscribers. Connect accepts an optional scheduler argument.
type Creator ¶ added in v0.2.0
Creator[T] is a function type that generates values for an Observable stream.
The Creator function receives a zero-based index for the current iteration and returns a tuple containing:
- Next: The next value to emit (of type T)
- Err: Any error that occurred during value generation
- Done: Boolean flag indicating whether the sequence is complete
When Done is true, the Observable will complete after emitting any provided error. When Err is non-nil, the Observable will emit the error and then complete.
type Float ¶ added in v0.2.2
Float is a constraint that permits any floating-point type. If future releases of Go add new predeclared floating-point types, this constraint will be modified to include them.
type Integer ¶ added in v0.2.2
Integer is a constraint that permits any integer type. If future releases of Go add new predeclared integer types, this constraint will be modified to include them.
type MaxBufferSizeOption ¶ added in v0.2.1
type MaxBufferSizeOption = func(*int)
MaxBufferSizeOption is a function type used for configuring the maximum buffer size of an observable stream.
func WithMaxBufferSize ¶ added in v0.2.1
func WithMaxBufferSize(n int) MaxBufferSizeOption
WithMaxBufferSize creates a MaxBufferSizeOption that sets the maximum buffer size to n. This option is typically used when creating new observables to control memory usage.
type Observable ¶
type Observable[T any] func(Observer[T], Scheduler, Subscriber)
func AsObservable ¶ added in v0.2.0
func AsObservable[T any](observable Observable[any]) Observable[T]
func BufferCount ¶ added in v0.2.0
func BufferCount[T any](observable Observable[T], bufferSize, startBufferEvery int) Observable[[]T]
func CombineAll ¶ added in v0.2.0
func CombineAll[T any](observable Observable[Observable[T]]) Observable[[]T]
func CombineLatest ¶
func CombineLatest[T any](observables ...Observable[T]) Observable[[]T]
func CombineLatest2 ¶ added in v0.2.1
func CombineLatest2[T, U any](first Observable[T], second Observable[U]) Observable[Tuple2[T, U]]
func CombineLatest3 ¶ added in v0.2.1
func CombineLatest3[T, U, V any](first Observable[T], second Observable[U], third Observable[V]) Observable[Tuple3[T, U, V]]
func CombineLatest4 ¶ added in v0.2.1
func CombineLatest4[T, U, V, W any](first Observable[T], second Observable[U], third Observable[V], fourth Observable[W]) Observable[Tuple4[T, U, V, W]]
func CombineLatest5 ¶ added in v0.2.1
func CombineLatest5[T, U, V, W, X any](first Observable[T], second Observable[U], third Observable[V], fourth Observable[W], fifth Observable[X]) Observable[Tuple5[T, U, V, W, X]]
func Concat ¶
func Concat[T any](observables ...Observable[T]) Observable[T]
func ConcatAll ¶ added in v0.2.0
func ConcatAll[T any](observable Observable[Observable[T]]) Observable[T]
func ConcatMap ¶ added in v0.2.0
func ConcatMap[T, U any](observable Observable[T], project func(T) Observable[U]) Observable[U]
func Create ¶
func Create[T any](create Creator[T]) Observable[T]
Create constructs a new Observable from a Creator function.
The Creator function is called repeatedly with an incrementing index value, and returns a tuple of (next value, error, done flag). The Observable will continue producing values until either:
- The Creator signals completion by returning done=true
- The Observer unsubscribes
- The Creator returns an error (which will be emitted with done=true)
This function provides a bridge between imperative code and the reactive Observable pattern.
func Defer ¶
func Defer[T any](factory func() Observable[T]) Observable[T]
func Empty ¶
func Empty[T any]() Observable[T]
func ExhaustAll ¶ added in v0.2.0
func ExhaustAll[T any](observable Observable[Observable[T]]) Observable[T]
func ExhaustMap ¶ added in v0.2.0
func ExhaustMap[T, U any](observable Observable[T], project func(T) Observable[U]) Observable[U]
func From ¶
func From[T any](slice ...T) Observable[T]
func Map ¶ added in v0.2.0
func Map[T, U any](observable Observable[T], project func(T) U) Observable[U]
func MapE ¶ added in v0.2.0
func MapE[T, U any](observable Observable[T], project func(T) (U, error)) Observable[U]
func Merge ¶
func Merge[T any](observables ...Observable[T]) Observable[T]
func MergeAll ¶ added in v0.2.0
func MergeAll[T any](observable Observable[Observable[T]]) Observable[T]
func MergeMap ¶ added in v0.2.0
func MergeMap[T, U any](observable Observable[T], project func(T) Observable[U]) Observable[U]
func Never ¶
func Never[T any]() Observable[T]
func Of ¶
func Of[T any](value T) Observable[T]
func Race ¶ added in v0.2.0
func Race[T any](observables ...Observable[T]) Observable[T]
func Recv ¶ added in v0.2.0
func Recv[T any](ch <-chan T) Observable[T]
func Reduce ¶ added in v0.2.0
func Reduce[T, U any](observable Observable[T], seed U, accumulator func(acc U, next T) U) Observable[U]
func ReduceE ¶ added in v0.2.1
func ReduceE[T, U any](observable Observable[T], seed U, accumulator func(acc U, next T) (U, error)) Observable[U]
func Scan ¶ added in v0.2.0
func Scan[T, U any](observable Observable[T], seed U, accumulator func(acc U, next T) U) Observable[U]
func ScanE ¶ added in v0.2.0
func ScanE[T, U any](observable Observable[T], seed U, accumulator func(acc U, next T) (U, error)) Observable[U]
func SwitchAll ¶ added in v0.2.0
func SwitchAll[T any](observable Observable[Observable[T]]) Observable[T]
func SwitchMap ¶ added in v0.2.0
func SwitchMap[T, U any](o Observable[T], project func(T) Observable[U]) Observable[U]
func Throw ¶
func Throw[T any](err error) Observable[T]
func Ticker ¶
Ticker creates an ObservableTime that emits a sequence of timestamps after an initialDelay has passed. Subsequent timestamps are emitted using a schedule of intervals passed in. If only the initialDelay is given, Ticker will emit only once.
func WithLatestFrom ¶ added in v0.2.0
func WithLatestFrom[T any](observables ...Observable[T]) Observable[[]T]
func WithLatestFrom2 ¶ added in v0.2.1
func WithLatestFrom2[T, U any](first Observable[T], second Observable[U]) Observable[Tuple2[T, U]]
func WithLatestFrom3 ¶ added in v0.2.1
func WithLatestFrom3[T, U, V any](first Observable[T], second Observable[U], third Observable[V]) Observable[Tuple3[T, U, V]]
func WithLatestFrom4 ¶ added in v0.2.1
func WithLatestFrom4[T, U, V, W any](first Observable[T], second Observable[U], third Observable[V], fourth Observable[W]) Observable[Tuple4[T, U, V, W]]
func WithLatestFrom5 ¶ added in v0.2.1
func WithLatestFrom5[T, U, V, W, X any](first Observable[T], second Observable[U], third Observable[V], fourth Observable[W], fifth Observable[X]) Observable[Tuple5[T, U, V, W, X]]
func WithLatestFromAll ¶ added in v0.2.0
func WithLatestFromAll[T any](observable Observable[Observable[T]]) Observable[[]T]
func Zip ¶ added in v0.2.1
func Zip[T any](observables ...Observable[T]) Observable[[]T]
func Zip2 ¶ added in v0.2.1
func Zip2[T, U any](first Observable[T], second Observable[U], options ...MaxBufferSizeOption) Observable[Tuple2[T, U]]
func Zip3 ¶ added in v0.2.1
func Zip3[T, U, V any](first Observable[T], second Observable[U], third Observable[V], options ...MaxBufferSizeOption) Observable[Tuple3[T, U, V]]
func Zip4 ¶ added in v0.2.1
func Zip4[T, U, V, W any](first Observable[T], second Observable[U], third Observable[V], fourth Observable[W], options ...MaxBufferSizeOption) Observable[Tuple4[T, U, V, W]]
func Zip5 ¶ added in v0.2.1
func Zip5[T, U, V, W, X any](first Observable[T], second Observable[U], third Observable[V], fourth Observable[W], fifth Observable[X], options ...MaxBufferSizeOption) Observable[Tuple5[T, U, V, W, X]]
func ZipAll ¶ added in v0.2.1
func ZipAll[T any](observable Observable[Observable[T]], options ...MaxBufferSizeOption) Observable[[]T]
func (Observable[T]) All ¶
func (observable Observable[T]) All(scheduler ...Scheduler) iter.Seq2[int, T]
All converts an Observable stream into an iterator sequence that pairs each element with its index. It returns an iter.Seq2[int, T] which yields each element along with its position in the sequence. The scheduler parameter is optional and determines the execution context. Note: This method ignores any errors from the observable stream.
func (Observable[T]) Append ¶ added in v0.2.1
func (observable Observable[T]) Append(slice *[]T) Observable[T]
Append is a method variant of the Append function that appends each emitted value to the provided slice while forwarding all emissions to downstream operators. This is a convenience method that calls the standalone Append function.
func (Observable[T]) AsObservable ¶
func (observable Observable[T]) AsObservable() Observable[any]
func (Observable[T]) Assign ¶ added in v0.2.0
func (observable Observable[T]) Assign(value *T) Observable[T]
Assign is a method version of the Assign function. It assigns every next value that is not done to the provided variable.
func (Observable[T]) AutoUnsubscribe ¶
func (observable Observable[T]) AutoUnsubscribe() Observable[T]
func (Observable[T]) Catch ¶
func (observable Observable[T]) Catch(other Observable[T]) Observable[T]
func (Observable[T]) CatchError ¶
func (observable Observable[T]) CatchError(selector func(err error, caught Observable[T]) Observable[T]) Observable[T]
func (Observable[T]) ConcatWith ¶
func (observable Observable[T]) ConcatWith(others ...Observable[T]) Observable[T]
func (Observable[T]) Count ¶
func (observable Observable[T]) Count() Observable[int]
func (Observable[T]) Delay ¶
func (observable Observable[T]) Delay(duration time.Duration) Observable[T]
func (Observable[T]) DistinctUntilChanged ¶
func (observable Observable[T]) DistinctUntilChanged(equal func(T, T) bool) Observable[T]
func (Observable[T]) Do ¶
func (observable Observable[T]) Do(f func(T)) Observable[T]
func (Observable[T]) ElementAt ¶
func (observable Observable[T]) ElementAt(n int) Observable[T]
func (Observable[T]) EndWith ¶ added in v0.2.2
func (observable Observable[T]) EndWith(values ...T) Observable[T]
func (Observable[T]) Filter ¶
func (observable Observable[T]) Filter(predicate func(T) bool) Observable[T]
func (Observable[T]) First ¶
func (observable Observable[T]) First(schedulers ...Scheduler) (value T, err error)
func (Observable[T]) Fprint ¶ added in v0.2.0
func (observable Observable[T]) Fprint(out io.Writer) Observable[T]
func (Observable[T]) Fprintf ¶ added in v0.2.0
func (observable Observable[T]) Fprintf(out io.Writer, format string) Observable[T]
func (Observable[T]) Fprintln ¶ added in v0.2.0
func (observable Observable[T]) Fprintln(out io.Writer) Observable[T]
func (Observable[T]) Go ¶ added in v0.2.0
func (observable Observable[T]) Go(schedulers ...Scheduler) Subscription
Go subscribes to the observable and starts execution on a separate goroutine. It ignores all emissions from the observable sequence, making it useful when you only care about side effects and not the actual values. By default, it uses the Goroutine scheduler, but an optional scheduler can be provided. Returns a Subscription that can be used to cancel the subscription when no longer needed.
func (Observable[T]) Last ¶
func (observable Observable[T]) Last(schedulers ...Scheduler) (value T, err error)
func (Observable[T]) Map ¶
func (observable Observable[T]) Map(project func(T) any) Observable[any]
func (Observable[T]) MapE ¶ added in v0.2.1
func (observable Observable[T]) MapE(project func(T) (any, error)) Observable[any]
func (Observable[T]) Marshal ¶ added in v0.2.0
func (observable Observable[T]) Marshal(marshal func(any) ([]byte, error)) Observable[[]byte]
func (Observable[T]) MergeWith ¶
func (observable Observable[T]) MergeWith(others ...Observable[T]) Observable[T]
func (Observable[T]) OnComplete ¶ added in v0.2.2
func (observable Observable[T]) OnComplete(f func()) Observable[T]
func (Observable[T]) OnDone ¶ added in v0.2.2
func (observable Observable[T]) OnDone(f func(error)) Observable[T]
func (Observable[T]) OnError ¶ added in v0.2.2
func (observable Observable[T]) OnError(f func(error)) Observable[T]
func (Observable[T]) OnNext ¶ added in v0.2.2
func (observable Observable[T]) OnNext(f func(T)) Observable[T]
func (Observable[T]) Passthrough ¶ added in v0.2.0
func (observable Observable[T]) Passthrough() Observable[T]
func (Observable[T]) Pipe ¶ added in v0.2.0
func (observable Observable[T]) Pipe(segments ...Pipe[T]) Observable[T]
func (Observable[T]) Print ¶ added in v0.2.0
func (observable Observable[T]) Print() Observable[T]
func (Observable[T]) Printf ¶ added in v0.2.0
func (observable Observable[T]) Printf(format string) Observable[T]
func (Observable[T]) Println ¶
func (observable Observable[T]) Println() Observable[T]
func (Observable[T]) Publish ¶
func (observable Observable[T]) Publish() Connectable[T]
Publish returns a multicasting Observable[T] for an underlying Observable[T] as a Connectable[T] type.
func (Observable[T]) RaceWith ¶ added in v0.2.0
func (observable Observable[T]) RaceWith(others ...Observable[T]) Observable[T]
func (Observable[T]) Repeat ¶
func (observable Observable[T]) Repeat(count ...int) Observable[T]
Repeat emits the items emitted by the source Observable repeatedly.
Parameters:
- count: Optional. The number of repetitions:
- If omitted: The source Observable is repeated indefinitely
- If 0: Returns an empty Observable
- If negative: Returns an Observable that emits an error
- If multiple count values: Returns an Observable that emits an error
The resulting Observable will subscribe to the source Observable repeatedly each time the source completes, up to the specified count.
func (Observable[T]) Retry ¶
func (observable Observable[T]) Retry(limit ...int) Observable[T]
func (Observable[T]) RetryTime ¶ added in v0.2.0
func (observable Observable[T]) RetryTime(backoff func(int) time.Duration, limit ...int) Observable[T]
func (Observable[T]) SampleTime ¶
func (observable Observable[T]) SampleTime(window time.Duration) Observable[T]
SampleTime emits the most recent item emitted by an Observable within periodic time intervals.
func (Observable[T]) Send ¶ added in v0.2.0
func (observable Observable[T]) Send(ch chan<- T) Observable[T]
func (Observable[T]) Share ¶ added in v0.2.0
func (observable Observable[T]) Share() Observable[T]
Share returns a new Observable that multicasts (shares) the original Observable. As long as there is at least one Subscriber this Observable will be subscribed and emitting data. When all subscribers have unsubscribed it will unsubscribe from the source Observable. Because the Observable is multicasting it makes the stream hot.
This method is useful when you have an Observable that is expensive to create or has side-effects, but you want to share the results of that Observable with multiple subscribers. By using `Share`, you can avoid creating multiple instances of the Observable and ensure that all subscribers receive the same data.
func (Observable[T]) Skip ¶
func (observable Observable[T]) Skip(n int) Observable[T]
func (Observable[T]) Slice ¶ added in v0.2.0
func (observable Observable[T]) Slice(schedulers ...Scheduler) (slice []T, err error)
func (Observable[T]) StartWith ¶
func (observable Observable[T]) StartWith(values ...T) Observable[T]
func (Observable[T]) Subscribe ¶
func (observable Observable[T]) Subscribe(observe Observer[T], scheduler Scheduler) Subscription
func (Observable[T]) SubscribeOn ¶
func (observable Observable[T]) SubscribeOn(scheduler ConcurrentScheduler) Observable[T]
func (Observable[T]) Take ¶
func (observable Observable[T]) Take(n int) Observable[T]
Take returns an Observable that emits only the first count values emitted by the source Observable. If the source emits fewer than count values then all of its values are emitted. After that, it completes, regardless if the source completes.
func (Observable[T]) TakeWhile ¶
func (observable Observable[T]) TakeWhile(condition func(T) bool) Observable[T]
func (Observable[T]) Tap ¶ added in v0.2.0
func (observable Observable[T]) Tap(tap Observer[T]) Observable[T]
func (Observable[T]) Values ¶ added in v0.2.0
func (observable Observable[T]) Values(scheduler ...Scheduler) iter.Seq[T]
func (Observable[T]) Wait ¶
func (observable Observable[T]) Wait(schedulers ...Scheduler) error
type Observer ¶
Observer[T] represents a consumer of values delivered by an Observable. It is implemented as a function that takes three parameters: - next: the next value emitted by the Observable - err: any error that occurred during emission (nil if no error) - done: a boolean indicating whether the Observable has completed
Observers follow the reactive pattern by receiving a stream of events (values, errors, or completion signals) and reacting to them accordingly.
func AsObserver ¶ added in v0.2.0
AsObserver converts an Observer of type `any` to an Observer of a specific type T. This allows adapting a generic Observer to a more specific type context.
func Ignore ¶ added in v0.2.0
Ignore creates an Observer that simply discards any emissions from an Observable. It is useful when you need to create an Observer but don't care about its values.
func (Observer[T]) AsObserver ¶
AsObserver converts a typed Observer[T] to a generic Observer[any]. It handles type conversion from 'any' back to T, and will emit an ErrTypecastFailed error when conversion fails.
func (Observer[T]) Done ¶ added in v0.2.2
Done signals that the Observable has completed emitting values, optionally with an error. If err is nil, it indicates normal completion. If err is non-nil, it indicates that the Observable terminated with an error.
After Done is called, the Observable will not emit any more values, regardless of whether the completion was successful or due to an error.
type Pipe ¶ added in v0.2.0
type Pipe[T any] func(Observable[T]) Observable[T]
func Append ¶ added in v0.2.1
Append creates a pipe that appends each emitted value to the provided slice. It passes each value through to the next observer after appending it. This allows collecting all emitted values in a slice while still forwarding them. Only values emitted before completion (done=false) are appended.
func Assign ¶ added in v0.2.0
Assign creates a pipe that assigns every next value that is not done to the provided variable. The pipe will forward all events (next, err, done) to the next observer.
func AutoUnsubscribe ¶ added in v0.2.2
func Catch ¶ added in v0.2.0
func Catch[T any](other Observable[T]) Pipe[T]
func CatchError ¶ added in v0.2.0
func CatchError[T any](selector func(err error, caught Observable[T]) Observable[T]) Pipe[T]
func ConcatWith ¶ added in v0.2.1
func ConcatWith[T any](others ...Observable[T]) Pipe[T]
func DistinctUntilChanged ¶ added in v0.2.0
func MergeWith ¶ added in v0.2.1
func MergeWith[T any](others ...Observable[T]) Pipe[T]
func OnComplete ¶ added in v0.2.1
func Passthrough ¶ added in v0.2.0
func RaceWith ¶ added in v0.2.1
func RaceWith[T any](others ...Observable[T]) Pipe[T]
func Repeat ¶ added in v0.2.2
Repeat creates an Observable that emits the entire source sequence multiple times.
Parameters:
- count: Optional. The number of repetitions:
- If omitted: The source Observable is repeated indefinitely
- If 0: Returns an empty Observable
- If negative: Returns an Observable that emits an error
- If multiple count values: Returns an Observable that emits an error
The resulting Observable will subscribe to the source Observable repeatedly each time the source completes, up to the specified count.
type Signed ¶ added in v0.2.2
Signed is a constraint that permits any signed integer type. If future releases of Go add new predeclared signed integer types, this constraint will be modified to include them.
type Subscriber ¶
type Subscriber interface {
// Subscribed returns true if the subscriber is in a subscribed state.
// Returns false once Unsubscribe has been called.
Subscribed() bool
// Unsubscribe changes the state to unsubscribed and executes all registered
// callback functions. Does nothing if already unsubscribed.
Unsubscribe()
// Add creates and returns a new child Subscriber.
// If the parent is already unsubscribed, the child will be created in an
// unsubscribed state. Otherwise, the child will be unsubscribed when the parent
// is unsubscribed.
Add() Subscriber
// OnUnsubscribe registers a callback function to be executed when Unsubscribe is called.
// If the subscriber is already unsubscribed, the callback is executed immediately.
// If callback is nil, this method does nothing.
OnUnsubscribe(callback func())
}
Subscriber is a subscribable entity that allows construction of a Subscriber tree.
type Subscription ¶
type Subscription interface {
// Subscribed returns true until Unsubscribe is called.
Subscribed() bool
// Unsubscribe will change the state to unsubscribed.
Unsubscribe()
// Done returns a channel that is closed when the subscription state changes to unsubscribed.
// This channel can be used with select statements to react to subscription termination events.
// If the scheduler is not concurrent, it will spawn a goroutine to wait for the scheduler.
Done() <-chan struct{}
// Err returns the subscription's terminal state:
// - nil if the observable completed successfully
// - the observable's error if it terminated with an error
// - SubscriptionCanceled if the subscription was manually unsubscribed
// - SubscriptionActive if the subscription is still active
Err() error
// Wait blocks until the subscription state becomes unsubscribed.
// If the subscription is already unsubscribed, it returns immediately.
// If the scheduler is not concurrent, it will wait for the scheduler to complete.
// Returns:
// - nil if the observable completed successfully
// - the observable's error if it terminated with an error
// - SubscriptionCanceled if the subscription was manually unsubscribed
Wait() error
}
Subscription is an interface that allows monitoring and controlling a subscription. It provides methods for tracking the subscription's lifecycle.
type Tuple4 ¶ added in v0.2.1
type Tuple4[T, U, V, W any] struct { First T Second U Third V Fourth W }
Source Files
¶
- all.go
- append.go
- asobservable.go
- assign.go
- autoconnect.go
- autounsubscribe.go
- buffercount.go
- catch.go
- catcherror.go
- combineall.go
- combinelatest.go
- concat.go
- concatall.go
- concatmap.go
- concatwith.go
- connectable.go
- connector.go
- constraints.go
- count.go
- create.go
- creator.go
- defer.go
- delay.go
- distinctuntilchanged.go
- do.go
- doc.go
- elementat.go
- empty.go
- endwith.go
- equal.go
- err.go
- exhaustall.go
- exhaustmap.go
- filter.go
- first.go
- fprint.go
- fprintf.go
- fprintln.go
- from.go
- go.go
- interval.go
- last.go
- map.go
- mape.go
- marshal.go
- maxbuffersize.go
- merge.go
- mergeall.go
- mergemap.go
- mergewith.go
- multicast.go
- must.go
- never.go
- observable.go
- observer.go
- of.go
- oncomplete.go
- ondone.go
- onerror.go
- onnext.go
- passthrough.go
- pipe.go
- print.go
- printf.go
- println.go
- publish.go
- pull.go
- race.go
- racewith.go
- recv.go
- reduce.go
- reducee.go
- refcount.go
- repeat.go
- retry.go
- retrytime.go
- sampletime.go
- scan.go
- scane.go
- scheduler.go
- send.go
- share.go
- skip.go
- slice.go
- startwith.go
- subject.go
- subscribe.go
- subscribeon.go
- subscriber.go
- subscription.go
- switchall.go
- switchmap.go
- take.go
- takewhile.go
- tap.go
- throw.go
- ticker.go
- timer.go
- tuple.go
- values.go
- wait.go
- withlatestfrom.go
- withlatestfromall.go
- zip.go
- zipall.go