tools

package module
v0.31.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 26, 2025 License: MIT Imports: 3 Imported by: 33

README

task-tools

CircleCI

A set of tools and apps used in the task ecosystem

Getting Started

Creating a worker
const desc = "cli desciption"  
type options struct {
	// worker specific config values 
}

func (o *options) Validate() error {
	//todo: validate options struct and error on missing/invalid inputs
	return nil 
}

func main() {
	opts := &options{}
	app := bootstrap.NewWorkerApp("app Name", opts.NewWorker, opts).
		Description(desc).
		Version(tools.Version).Initialize()

	app.Run()
}

func (o *options) NewWorker(info string) task.Worker {
  // TODO: parse the info string and setup a new Worker
  return &worker{
    Meta:    task.NewMeta(),
  } 
}

type worker struct {
	task.Meta
}

func (w *worker) DoTask(ctx context.Context) (task.Result, string) {
  // TODO: Process the requested job and return Complete, error and details about the job. 
  return task.Completed("All done")
}

Creating a taskmaster
const desc = "cli description"

type options struct {}

func (o *options) Validate() error {
	//todo: validate options struct and error on missing/invalid inputs
	return nil 
}

func main() {
	opts := &options{}
	app := bootstrap.NewTaskMaster("appName", New, opts).
		Version(tools.String()).
		Description(desc)
	app.Initialize()
	app.Run()
}

type taskMaster struct {} 

func New(app *bootstrap.TaskMaster) bootstrap.Runner {
	return &taskMaster{}
}

func (tm *taskMaster) Info() interface{} {
	// provide a struct of data to be display on the /info status endpoint
	return struct{}{}
}

func (tm *taskMaster) Run(ctx context.Context) error {
	// main running process
	// read from consumer and produce tasks as required
	return nil 
}

Pre-built Apps

Flowlord

Production-ready task orchestration engine for managing complex workflow dependencies with intelligent scheduling, automatic retries, and real-time monitoring. Features include:

  • Workflow Management - Multi-phase workflows with parent-child task dependencies
  • Intelligent Scheduling - Cron-based scheduling with template-based task generation
  • Optional SQLite Cache - Task history, alerts, and file tracking for troubleshooting (non-critical, stateless operation)
  • Web Dashboard - Real-time monitoring UI with filtering, pagination, and date navigation
  • Batch Processing - Generate multiple tasks from date ranges, metadata arrays, or data files
  • RESTful API - Comprehensive API for backloading, monitoring, and workflow management

See detailed documentation.

Workers
  • bq-load: BigQuery Loader
  • sql-load: Postgres/MySQL Optimized Idempotent loader
  • sql-readx: Postgres/MySQL reader with ability to execute admin query
    • perfect for scheduling admin tasks like partition creation
  • db-check: Monitoring tools to verify data is being populated as expect in DB
  • transform: generic json modification worker that uses gojq

Utilities

File Tools

read/write from local, s3, gcs, minio with the same tool. Use a URL to distinguish between the providers.

  • s3://bucket/folder
  • gs://bucket/folder
  • mc://bucket/folder
  • local/folder/
opts :=  &file.Options{ AccessKey: "123", SecretKey: "secret123"}
list details about all files with a remote s3 directory
for _, f := range file.List("s3://bucket/folder/", opts) {
  fmt.Println(f.JSONString())
} 
read and process data from a file
  reader, err := file.NewReader("gs://bucket/folder/file.txt", opts)
  if err != nil {
    log.Fatal(err) 
  }
  scanner := file.NewScanner(reader) 
  for scanner.Scan() { // go through each line of the file
    fmt.Println(scanner.Text()) 
    // process data 
  }
iterator through a file
// basic case 
for l := range NewIterator("../internal/test/nop.sql", nil).Lines() {
  //TODO: do something with the line
  data += string(l)
}

// handle errors and get stats 
it := NewIterator("../internal/test/nop.sql", nil)
for l := range it.Lines() {
  //TODO: do something with the line
  data += string(l)
}
if it.Error() != nil {
  return it.Error() 
}
fmt.Println(it.Stats().JSONString())
write to a file
writer, err := file.NewWriter("s3://bucket/folder/data.txt", opts) 
data = []any{} // some sort of data 
for _, d := range data {
  b, _ := json.Marshal(data) 
  writer.WriteLine(b) // vs writer.Write(b) 
}
if fatalError {
  // don't commit the file and cancel everything 
  writer.Abort()
  return 
}

writer.Close()  
Slack

Utility to send messages to slack.

func main() {
    notify := slack.Slack{
        Url: "https://hooks.slack.com/services/ORG_ID/APP_IP/CHANNEL_ID
    }
    notify.Notify("Hello World", slack.OK) 
}

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	// specify version, BuildTimeUTC, AppName at build time with `-ldflags "-X path.to.package.Version x.x.x"` etc...
	Version      = "-"
	BuildTimeUTC = "-"
)

Functions

func ShowVersion added in v0.21.0

func ShowVersion(show bool)

func String

func String() string

Types

This section is empty.

Directories

Path Synopsis
apps module
db
buf
nop

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL