cmd

package
v1.7.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 24, 2025 License: MIT Imports: 43 Imported by: 0

Documentation

Index

Constants

View Source
const (
	StageSkipped   = "Skipped"
	StageCancelled = "Cancelled"
	StageSetup     = "Setup"
)

Stage constants

View Source
const (
	DurationHourly  = "hourly"
	DurationDaily   = "daily"
	DurationWeekly  = "weekly"
	DurationMonthly = "monthly"
	DurationYearly  = "yearly"
)

Duration constants for output splitting

Variables

View Source
var (
	ErrInsufficientPermissions  = errors.New("insufficient permissions to read table")
	ErrPartitionNoPermissions   = errors.New("partition tables exist but you don't have SELECT permissions")
	ErrS3ClientNotInitialized   = errors.New("S3 client not initialized")
	ErrS3UploaderNotInitialized = errors.New("S3 uploader not initialized")
)

Error definitions

View Source
var (
	ErrDatabaseUserRequired    = errors.New("database user is required")
	ErrDatabaseNameRequired    = errors.New("database name is required")
	ErrDatabasePortInvalid     = errors.New("database port must be between 1 and 65535")
	ErrStatementTimeoutInvalid = errors.New("database statement timeout must be >= 0")
	ErrMaxRetriesInvalid       = errors.New("database max retries must be >= 0")
	ErrRetryDelayInvalid       = errors.New("database retry delay must be >= 0")
	ErrS3EndpointRequired      = errors.New("S3 endpoint is required")
	ErrS3BucketRequired        = errors.New("S3 bucket is required")
	ErrS3AccessKeyRequired     = errors.New("S3 access key is required")
	ErrS3SecretKeyRequired     = errors.New("S3 secret key is required")
	ErrS3RegionInvalid         = errors.New("S3 region contains invalid characters or is too long")
	ErrTableNameRequired       = errors.New("table name is required")
	ErrTableNameInvalid        = errors.New("table name is invalid: must be 1-63 characters, start with a letter or underscore, and contain only letters, numbers, and underscores")
	ErrStartDateFormatInvalid  = errors.New("invalid start date format")
	ErrEndDateFormatInvalid    = errors.New("invalid end date format")
	ErrWorkersMinimum          = errors.New("workers must be at least 1")
	ErrWorkersMaximum          = errors.New("workers must not exceed 1000")
	ErrChunkSizeMinimum        = errors.New("chunk size must be at least 100")
	ErrChunkSizeMaximum        = errors.New("chunk size must not exceed 1000000")
	ErrPathTemplateRequired    = errors.New("path template is required")
	ErrPathTemplateInvalid     = errors.New("path template must contain {table} placeholder")
	ErrOutputDurationInvalid   = errors.New("output duration must be one of: hourly, daily, weekly, monthly, yearly")
	ErrOutputFormatInvalid     = errors.New("output format must be one of: jsonl, csv, parquet")
	ErrCompressionInvalid      = errors.New("compression must be one of: zstd, lz4, gzip, none")
	ErrCompressionLevelInvalid = errors.New("compression level must be between 1 and 22 (zstd), 1-9 (lz4/gzip)")
	ErrDateColumnInvalid       = errors.New("date column is invalid: must start with a letter or underscore, and contain only letters, numbers, and underscores")
	ErrDumpModeInvalid         = errors.New("dump mode must be one of: schema-only, data-only, schema-and-data")
)

Static errors for configuration validation

View Source
var ErrTableHasNoColumns = errors.New("table exists but has no columns")

ErrTableHasNoColumns is returned when a table exists but has no columns

View Source
var ErrTableNotFound = errors.New("table not found")

ErrTableNotFound is returned when a table does not exist

View Source
var ErrTableNotFoundOrEmpty = errors.New("table not found or has no columns")

ErrTableNotFoundOrEmpty is returned when a table is not found or has no columns

View Source
var (
	ErrVersionCheckFailed = errors.New("version check failed")
)

Static errors for version checking

View Source
var (
	// Version information - set via ldflags during build
	// Example: go build -ldflags "-X github.com/airframesio/data-archiver/cmd.Version=1.2.3"
	Version = "dev" // Default to "dev" if not set during build

)

Functions

func Execute

func Execute() error

func GenerateFilename

func GenerateFilename(tableName string, timestamp time.Time, duration string, formatExt string, compressionExt string) string

GenerateFilename creates a filename based on duration and timestamp

func GetPIDFilePath

func GetPIDFilePath() string

GetPIDFilePath returns the path to the PID file

func GetTaskFilePath

func GetTaskFilePath() string

GetTaskFilePath returns the path to the task info file

func GetTimeRangeForDuration

func GetTimeRangeForDuration(baseTime time.Time, duration string) (time.Time, time.Time)

GetTimeRangeForDuration returns the start and end time for a given duration

func IsProcessRunning

func IsProcessRunning(pid int) bool

IsProcessRunning checks if a process with given PID is running Works on both Unix and Windows systems

func ReadPIDFile

func ReadPIDFile() (int, error)

ReadPIDFile reads the PID from file

func RemovePIDFile

func RemovePIDFile() error

RemovePIDFile removes the PID file

func RemoveTaskFile

func RemoveTaskFile() error

RemoveTaskFile removes the task info file

func SetSignalContext

func SetSignalContext(ctx context.Context, stopFile string)

SetSignalContext stores the signal-aware context created in main() This must be called before Execute() to ensure proper signal handling

func SplitPartitionByDuration

func SplitPartitionByDuration(partitionStart, partitionEnd time.Time, duration string) []struct {
	Start time.Time
	End   time.Time
}

SplitPartitionByDuration splits a partition's date range into multiple time ranges based on duration

func WritePIDFile

func WritePIDFile() error

WritePIDFile writes the current process PID to a file

func WriteTaskInfo

func WriteTaskInfo(info *TaskInfo) error

WriteTaskInfo writes current task information to file

Types

type Archiver

type Archiver struct {
	// contains filtered or unexported fields
}

func NewArchiver

func NewArchiver(config *Config, logger *slog.Logger) *Archiver

func (*Archiver) ProcessPartitionWithProgress

func (a *Archiver) ProcessPartitionWithProgress(partition PartitionInfo, program *tea.Program) ProcessResult

func (*Archiver) Run

func (a *Archiver) Run(ctx context.Context) error

type CacheEntry

type CacheEntry struct {
	Table            string    `json:"table"`
	Partition        string    `json:"partition"`
	RowCount         int64     `json:"rowCount"`
	CountTime        time.Time `json:"countTime"`
	FileSize         int64     `json:"fileSize"`
	UncompressedSize int64     `json:"uncompressedSize"`
	FileMD5          string    `json:"fileMD5"`
	MultipartETag    string    `json:"multipartETag,omitempty"` // S3 multipart ETag for files >100MB
	FileTime         time.Time `json:"fileTime"`
	S3Key            string    `json:"s3Key"`
	S3Uploaded       bool      `json:"s3Uploaded"`
	S3UploadTime     time.Time `json:"s3UploadTime"`
	LastError        string    `json:"lastError"`
	ErrorTime        time.Time `json:"errorTime"`
	ProcessStartTime time.Time `json:"processStartTime,omitempty"` // When processing started for this job
}

type CacheResponse

type CacheResponse struct {
	Tables    []TableCache `json:"tables"`
	Timestamp time.Time    `json:"timestamp"`
}

type CacheScope added in v1.6.0

type CacheScope struct {
	Command    string
	Table      string
	OutputPath string
}

CacheScope represents a unique namespace for cached metadata. It combines the executing subcommand with the fully-qualified output path so that different commands or destinations do not clobber each other's cache.

func NewCacheScope added in v1.6.0

func NewCacheScope(command string, cfg *Config) CacheScope

NewCacheScope builds a cache scope for the provided command/config pair.

type ColumnInfo added in v1.4.0

type ColumnInfo struct {
	Name     string
	DataType string
	UDTName  string // PostgreSQL user-defined type name (e.g., int4, varchar, timestamp)
}

ColumnInfo represents metadata about a database column

func (*ColumnInfo) GetName added in v1.4.0

func (c *ColumnInfo) GetName() string

GetName implements formatters.ColumnSchema

func (*ColumnInfo) GetType added in v1.4.0

func (c *ColumnInfo) GetType() string

GetType implements formatters.ColumnSchema

type ColumnTypeMismatch added in v1.5.5

type ColumnTypeMismatch struct {
	ColumnName  string `json:"column_name"`
	Source1Type string `json:"source1_type"`
	Source2Type string `json:"source2_type"`
}

ColumnTypeMismatch represents a column type difference

type CompareConfig added in v1.5.5

type CompareConfig struct {
	Mode            string // schema-only, data-only, schema-and-data
	DataCompareType string // row-count, row-by-row, sample
	SampleSize      int
	Tables          []string // Empty = all tables
	OutputFormat    string   // text, json
	OutputFile      string
	Debug           bool
	DryRun          bool
}

CompareConfig contains comparison configuration

type Comparer added in v1.5.5

type Comparer struct {
	// contains filtered or unexported fields
}

Comparer handles comparison operations

func NewComparer added in v1.5.5

func NewComparer(source1, source2 *ComparisonSource, config *CompareConfig, logger *slog.Logger) *Comparer

NewComparer creates a new Comparer instance

func (*Comparer) Run added in v1.5.5

func (c *Comparer) Run(ctx context.Context) error

Run executes the comparison

type ComparisonResult added in v1.5.5

type ComparisonResult struct {
	Schema *SchemaComparisonResult `json:"schema,omitempty"`
	Data   *DataComparisonResult   `json:"data,omitempty"`
}

ComparisonResult contains the results of a comparison

type ComparisonSource added in v1.5.5

type ComparisonSource struct {
	Type         string // "db" or "s3"
	Database     DatabaseConfig
	S3           S3Config
	SchemaPath   string // S3 path for schemas
	DataPath     string // S3 path for data
	SchemaSource string // pg_dump, inferred, auto
}

ComparisonSource represents a source for comparison (database or S3)

type Config

type Config struct {
	Debug                     bool
	LogFormat                 string
	DryRun                    bool
	Workers                   int
	SkipCount                 bool
	CacheViewer               bool
	ViewerPort                int
	ChunkSize                 int  // Number of rows to process in each chunk (streaming mode)
	IncludeNonPartitionTables bool // Include regular tables matching partition naming pattern
	Database                  DatabaseConfig
	S3                        S3Config
	Table                     string
	StartDate                 string
	EndDate                   string
	OutputDuration            string
	OutputFormat              string
	Compression               string
	CompressionLevel          int
	DateColumn                string
	DumpMode                  string // pg_dump mode: schema-only, data-only, schema-and-data
	CacheScope                CacheScope
}

func (*Config) Validate

func (c *Config) Validate() error

type DataComparisonResult added in v1.5.5

type DataComparisonResult struct {
	RowCountDiffs map[string]*RowCountDiff `json:"row_count_diffs,omitempty"`
	RowByRowDiffs map[string]*RowByRowDiff `json:"row_by_row_diffs,omitempty"`
	SampleDiffs   map[string]*SampleDiff   `json:"sample_diffs,omitempty"`
}

DataComparisonResult contains data comparison results

type DatabaseConfig

type DatabaseConfig struct {
	Host             string
	Port             int
	User             string
	Password         string
	Name             string
	SSLMode          string
	StatementTimeout int // Statement timeout in seconds (0 = no timeout, default 300)
	MaxRetries       int // Maximum number of retry attempts for failed queries (default 3)
	RetryDelay       int // Delay in seconds between retry attempts (default 5)
}

type GitHubRelease

type GitHubRelease struct {
	TagName     string    `json:"tag_name"`
	Name        string    `json:"name"`
	PublishedAt time.Time `json:"published_at"`
	HTMLURL     string    `json:"html_url"`
}

GitHubRelease represents the structure of GitHub's latest release API response

type LogMessage added in v1.5.0

type LogMessage struct {
	Timestamp string `json:"timestamp"`
	Level     string `json:"level"`
	Message   string `json:"message"`
}

type PartitionCache

type PartitionCache struct {
	Entries map[string]PartitionCacheEntry `json:"entries"`
}

PartitionCache stores both row counts and file metadata

type PartitionCacheEntry

type PartitionCacheEntry struct {
	// Row count information
	RowCount  int64     `json:"row_count"`
	CountTime time.Time `json:"count_time"`

	// File metadata (stored after processing)
	FileSize         int64     `json:"file_size,omitempty"`         // Compressed size
	UncompressedSize int64     `json:"uncompressed_size,omitempty"` // Original size
	FileMD5          string    `json:"file_md5,omitempty"`
	MultipartETag    string    `json:"multipart_etag,omitempty"` // S3 multipart ETag for files >100MB
	FileTime         time.Time `json:"file_time,omitempty"`

	// S3 information
	S3Key        string    `json:"s3_key,omitempty"`
	S3Uploaded   bool      `json:"s3_uploaded,omitempty"`
	S3UploadTime time.Time `json:"s3_upload_time,omitempty"`

	// Error tracking
	LastError string    `json:"last_error,omitempty"`
	ErrorTime time.Time `json:"error_time,omitempty"`

	// Processing time tracking
	ProcessStartTime time.Time `json:"process_start_time,omitempty"` // When processing started for this job
}

type PartitionInfo

type PartitionInfo struct {
	TableName  string
	Date       time.Time
	RowCount   int64
	RangeStart time.Time
	RangeEnd   time.Time
}

func (PartitionInfo) HasCustomRange added in v1.6.0

func (p PartitionInfo) HasCustomRange() bool

type PathTemplate

type PathTemplate struct {
	// contains filtered or unexported fields
}

PathTemplate provides functionality to generate S3 paths from templates

func NewPathTemplate

func NewPathTemplate(template string) *PathTemplate

NewPathTemplate creates a new PathTemplate instance

func (*PathTemplate) Generate

func (pt *PathTemplate) Generate(tableName string, timestamp time.Time) string

Generate replaces placeholders in the template with actual values Supports: {table}, {YYYY}, {MM}, {DD}, {HH}

type PgDumpExecutor added in v1.5.4

type PgDumpExecutor struct {
	// contains filtered or unexported fields
}

PgDumpExecutor handles pg_dump operations with S3 upload

func NewPgDumpExecutor added in v1.5.4

func NewPgDumpExecutor(config *Config, logger *slog.Logger) *PgDumpExecutor

NewPgDumpExecutor creates a new pg_dump executor

func (*PgDumpExecutor) Run added in v1.5.4

func (e *PgDumpExecutor) Run(ctx context.Context) error

Run executes pg_dump and uploads to S3

type Phase

type Phase int
const (
	PhaseConnecting Phase = iota
	PhaseCheckingPermissions
	PhaseDiscovering
	PhaseCounting
	PhaseProcessing
	PhaseComplete
)

type ProcessResult

type ProcessResult struct {
	Partition    PartitionInfo
	Compressed   bool
	Uploaded     bool
	Skipped      bool
	SkipReason   string
	Error        error
	BytesWritten int64
	Stage        string
	S3Key        string        // S3 object key for uploaded file
	StartTime    time.Time     // When partition processing started
	Duration     time.Duration // How long partition processing took
}

type Restorer added in v1.5.3

type Restorer struct {
	// contains filtered or unexported fields
}

Restorer handles restoration of tables from S3

func NewRestorer added in v1.5.3

func NewRestorer(config *Config, logger *slog.Logger) *Restorer

NewRestorer creates a new Restorer instance

func (*Restorer) Run added in v1.5.3

func (r *Restorer) Run(ctx context.Context, restoreConfig map[string]string) error

Run executes the restore process

type RowByRowDiff added in v1.5.5

type RowByRowDiff struct {
	Source1TotalRows int64 `json:"source1_total_rows"`
	Source2TotalRows int64 `json:"source2_total_rows"`
	MatchingRows     int64 `json:"matching_rows"`
	MissingInSource2 int64 `json:"missing_in_source2"`
	ExtraInSource2   int64 `json:"extra_in_source2"`
}

RowByRowDiff contains row-by-row comparison results

type RowCountCache

type RowCountCache struct {
	Counts map[string]RowCountEntry `json:"counts"`
}

Legacy support - keep old structure for backward compatibility

type RowCountDiff added in v1.5.5

type RowCountDiff struct {
	Source1Count int64 `json:"source1_count"`
	Source2Count int64 `json:"source2_count"`
	Difference   int64 `json:"difference"`
}

RowCountDiff contains row count differences

type RowCountEntry

type RowCountEntry struct {
	Count     int64     `json:"count"`
	Timestamp time.Time `json:"timestamp"`
}

type S3Config

type S3Config struct {
	Endpoint     string
	Bucket       string
	AccessKey    string
	SecretKey    string
	Region       string
	PathTemplate string
}

type S3File added in v1.5.3

type S3File struct {
	Key                 string
	Size                int64
	LastModified        time.Time
	DetectedFormat      string
	DetectedCompression string
	Date                time.Time // Extracted from filename
}

S3File represents a file found in S3

type SampleDiff added in v1.5.5

type SampleDiff struct {
	Source1Sample []map[string]interface{} `json:"source1_sample,omitempty"`
	Source2Sample []map[string]interface{} `json:"source2_sample,omitempty"`
	Differences   []string                 `json:"differences"`
}

SampleDiff contains sample comparison results

type SchemaComparisonResult added in v1.5.5

type SchemaComparisonResult struct {
	TablesOnlyInSource1 []string                    `json:"tables_only_in_source1"`
	TablesOnlyInSource2 []string                    `json:"tables_only_in_source2"`
	TableDiffs          map[string]*TableSchemaDiff `json:"table_diffs"`
}

SchemaComparisonResult contains schema comparison results

type StatusResponse

type StatusResponse struct {
	ArchiverRunning bool      `json:"archiverRunning"`
	PID             int       `json:"pid,omitempty"`
	CurrentTask     *TaskInfo `json:"currentTask,omitempty"`
	Version         string    `json:"version"`
	UpdateAvailable bool      `json:"updateAvailable"`
	LatestVersion   string    `json:"latestVersion,omitempty"`
	ReleaseURL      string    `json:"releaseUrl,omitempty"`
	// Slice tracking fields
	CurrentSliceIndex int    `json:"currentSliceIndex,omitempty"`
	TotalSlices       int    `json:"totalSlices,omitempty"`
	CurrentSliceDate  string `json:"currentSliceDate,omitempty"`
	IsSlicing         bool   `json:"isSlicing"`
}

type TableCache

type TableCache struct {
	TableName string       `json:"tableName"`
	Entries   []CacheEntry `json:"entries"`
}

type TableSchema added in v1.4.0

type TableSchema struct {
	TableName string
	Columns   []ColumnInfo
}

TableSchema represents the schema of a database table

func (*TableSchema) GetColumns added in v1.4.0

func (s *TableSchema) GetColumns() []formatters.ColumnSchema

GetColumns implements formatters.TableSchema

type TableSchemaDiff added in v1.5.5

type TableSchemaDiff struct {
	ColumnsOnlyInSource1 []ColumnInfo         `json:"columns_only_in_source1"`
	ColumnsOnlyInSource2 []ColumnInfo         `json:"columns_only_in_source2"`
	TypeMismatches       []ColumnTypeMismatch `json:"type_mismatches"`
}

TableSchemaDiff contains differences for a single table

type TaskInfo

type TaskInfo struct {
	PID              int       `json:"pid"`
	StartTime        time.Time `json:"start_time"`
	Table            string    `json:"table"`
	StartDate        string    `json:"start_date"`
	EndDate          string    `json:"end_date"`
	CurrentTask      string    `json:"current_task"`
	CurrentPartition string    `json:"current_partition,omitempty"`
	CurrentStep      string    `json:"current_step,omitempty"`
	Progress         float64   `json:"progress"`
	TotalItems       int       `json:"total_items"`
	CompletedItems   int       `json:"completed_items"`
	LastUpdate       time.Time `json:"last_update"`
	// Slice tracking fields
	CurrentSliceIndex int    `json:"current_slice_index,omitempty"`
	TotalSlices       int    `json:"total_slices,omitempty"`
	CurrentSliceDate  string `json:"current_slice_date,omitempty"`
	// Partition and slice statistics
	TotalPartitions     int `json:"total_partitions,omitempty"`     // Total partitions discovered
	PartitionsCounted   int `json:"partitions_counted,omitempty"`   // Partitions that have been counted
	PartitionsProcessed int `json:"partitions_processed,omitempty"` // Partitions that have been processed
	SlicesProcessed     int `json:"slices_processed,omitempty"`     // Total slices processed across all partitions
}

TaskInfo represents the current archiving task status

func ReadTaskInfo

func ReadTaskInfo() (*TaskInfo, error)

ReadTaskInfo reads current task information from file

type VersionCheckCache

type VersionCheckCache struct {
	UpdateAvailable bool      `json:"update_available"`
	LatestVersion   string    `json:"latest_version"`
	ReleaseURL      string    `json:"release_url"`
	Timestamp       time.Time `json:"timestamp"`
}

VersionCheckCache represents cached version check data

type VersionCheckResult

type VersionCheckResult struct {
	UpdateAvailable bool
	CurrentVersion  string
	LatestVersion   string
	ReleaseURL      string
	Error           error
}

VersionCheckResult contains the result of checking for updates

type WSMessage

type WSMessage struct {
	Type string      `json:"type"`
	Data interface{} `json:"data"`
}

WebSocket message types

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL