Documentation
¶
Index ¶
- Constants
- Variables
- func Execute() error
- func GenerateFilename(tableName string, timestamp time.Time, duration string, formatExt string, ...) string
- func GetPIDFilePath() string
- func GetTaskFilePath() string
- func GetTimeRangeForDuration(baseTime time.Time, duration string) (time.Time, time.Time)
- func IsProcessRunning(pid int) bool
- func ReadPIDFile() (int, error)
- func RemovePIDFile() error
- func RemoveTaskFile() error
- func SetSignalContext(ctx context.Context, stopFile string)
- func SplitPartitionByDuration(partitionStart, partitionEnd time.Time, duration string) []struct{ ... }
- func WritePIDFile() error
- func WriteTaskInfo(info *TaskInfo) error
- type Archiver
- type CacheEntry
- type CacheResponse
- type CacheScope
- type ColumnInfo
- type ColumnTypeMismatch
- type CompareConfig
- type Comparer
- type ComparisonResult
- type ComparisonSource
- type Config
- type DataComparisonResult
- type DatabaseConfig
- type GitHubRelease
- type LogMessage
- type PartitionCache
- type PartitionCacheEntry
- type PartitionInfo
- type PathTemplate
- type PgDumpExecutor
- type Phase
- type ProcessResult
- type Restorer
- type RowByRowDiff
- type RowCountCache
- type RowCountDiff
- type RowCountEntry
- type S3Config
- type S3File
- type SampleDiff
- type SchemaComparisonResult
- type StatusResponse
- type TableCache
- type TableSchema
- type TableSchemaDiff
- type TaskInfo
- type VersionCheckCache
- type VersionCheckResult
- type WSMessage
Constants ¶
const ( StageSkipped = "Skipped" StageCancelled = "Cancelled" StageSetup = "Setup" )
Stage constants
const ( DurationHourly = "hourly" DurationDaily = "daily" DurationWeekly = "weekly" DurationMonthly = "monthly" DurationYearly = "yearly" )
Duration constants for output splitting
Variables ¶
var ( ErrInsufficientPermissions = errors.New("insufficient permissions to read table") ErrPartitionNoPermissions = errors.New("partition tables exist but you don't have SELECT permissions") ErrS3ClientNotInitialized = errors.New("S3 client not initialized") ErrS3UploaderNotInitialized = errors.New("S3 uploader not initialized") )
Error definitions
var ( ErrDatabaseUserRequired = errors.New("database user is required") ErrDatabaseNameRequired = errors.New("database name is required") ErrDatabasePortInvalid = errors.New("database port must be between 1 and 65535") ErrStatementTimeoutInvalid = errors.New("database statement timeout must be >= 0") ErrMaxRetriesInvalid = errors.New("database max retries must be >= 0") ErrRetryDelayInvalid = errors.New("database retry delay must be >= 0") ErrS3EndpointRequired = errors.New("S3 endpoint is required") ErrS3BucketRequired = errors.New("S3 bucket is required") ErrS3AccessKeyRequired = errors.New("S3 access key is required") ErrS3SecretKeyRequired = errors.New("S3 secret key is required") ErrS3RegionInvalid = errors.New("S3 region contains invalid characters or is too long") ErrTableNameRequired = errors.New("table name is required") ErrTableNameInvalid = errors.New("table name is invalid: must be 1-63 characters, start with a letter or underscore, and contain only letters, numbers, and underscores") ErrStartDateFormatInvalid = errors.New("invalid start date format") ErrEndDateFormatInvalid = errors.New("invalid end date format") ErrWorkersMinimum = errors.New("workers must be at least 1") ErrWorkersMaximum = errors.New("workers must not exceed 1000") ErrChunkSizeMinimum = errors.New("chunk size must be at least 100") ErrChunkSizeMaximum = errors.New("chunk size must not exceed 1000000") ErrPathTemplateRequired = errors.New("path template is required") ErrPathTemplateInvalid = errors.New("path template must contain {table} placeholder") ErrOutputDurationInvalid = errors.New("output duration must be one of: hourly, daily, weekly, monthly, yearly") ErrOutputFormatInvalid = errors.New("output format must be one of: jsonl, csv, parquet") ErrCompressionInvalid = errors.New("compression must be one of: zstd, lz4, gzip, none") ErrCompressionLevelInvalid = errors.New("compression level must be between 1 and 22 (zstd), 1-9 (lz4/gzip)") ErrDateColumnInvalid = errors.New("date column is invalid: must start with a letter or underscore, and contain only letters, numbers, and underscores") ErrDumpModeInvalid = errors.New("dump mode must be one of: schema-only, data-only, schema-and-data") )
Static errors for configuration validation
var ErrTableHasNoColumns = errors.New("table exists but has no columns")
ErrTableHasNoColumns is returned when a table exists but has no columns
var ErrTableNotFound = errors.New("table not found")
ErrTableNotFound is returned when a table does not exist
var ErrTableNotFoundOrEmpty = errors.New("table not found or has no columns")
ErrTableNotFoundOrEmpty is returned when a table is not found or has no columns
var (
ErrVersionCheckFailed = errors.New("version check failed")
)
Static errors for version checking
var ( // Version information - set via ldflags during build // Example: go build -ldflags "-X github.com/airframesio/data-archiver/cmd.Version=1.2.3" Version = "dev" // Default to "dev" if not set during build )
Functions ¶
func GenerateFilename ¶
func GenerateFilename(tableName string, timestamp time.Time, duration string, formatExt string, compressionExt string) string
GenerateFilename creates a filename based on duration and timestamp
func GetTaskFilePath ¶
func GetTaskFilePath() string
GetTaskFilePath returns the path to the task info file
func GetTimeRangeForDuration ¶
GetTimeRangeForDuration returns the start and end time for a given duration
func IsProcessRunning ¶
IsProcessRunning checks if a process with given PID is running Works on both Unix and Windows systems
func SetSignalContext ¶
SetSignalContext stores the signal-aware context created in main() This must be called before Execute() to ensure proper signal handling
func SplitPartitionByDuration ¶
func SplitPartitionByDuration(partitionStart, partitionEnd time.Time, duration string) []struct { Start time.Time End time.Time }
SplitPartitionByDuration splits a partition's date range into multiple time ranges based on duration
func WriteTaskInfo ¶
WriteTaskInfo writes current task information to file
Types ¶
type Archiver ¶
type Archiver struct {
// contains filtered or unexported fields
}
func (*Archiver) ProcessPartitionWithProgress ¶
func (a *Archiver) ProcessPartitionWithProgress(partition PartitionInfo, program *tea.Program) ProcessResult
type CacheEntry ¶
type CacheEntry struct {
Table string `json:"table"`
Partition string `json:"partition"`
RowCount int64 `json:"rowCount"`
CountTime time.Time `json:"countTime"`
FileSize int64 `json:"fileSize"`
UncompressedSize int64 `json:"uncompressedSize"`
FileMD5 string `json:"fileMD5"`
MultipartETag string `json:"multipartETag,omitempty"` // S3 multipart ETag for files >100MB
FileTime time.Time `json:"fileTime"`
S3Key string `json:"s3Key"`
S3Uploaded bool `json:"s3Uploaded"`
S3UploadTime time.Time `json:"s3UploadTime"`
LastError string `json:"lastError"`
ErrorTime time.Time `json:"errorTime"`
ProcessStartTime time.Time `json:"processStartTime,omitempty"` // When processing started for this job
}
type CacheResponse ¶
type CacheResponse struct {
Tables []TableCache `json:"tables"`
Timestamp time.Time `json:"timestamp"`
}
type CacheScope ¶ added in v1.6.0
CacheScope represents a unique namespace for cached metadata. It combines the executing subcommand with the fully-qualified output path so that different commands or destinations do not clobber each other's cache.
func NewCacheScope ¶ added in v1.6.0
func NewCacheScope(command string, cfg *Config) CacheScope
NewCacheScope builds a cache scope for the provided command/config pair.
type ColumnInfo ¶ added in v1.4.0
type ColumnInfo struct {
Name string
DataType string
UDTName string // PostgreSQL user-defined type name (e.g., int4, varchar, timestamp)
}
ColumnInfo represents metadata about a database column
func (*ColumnInfo) GetName ¶ added in v1.4.0
func (c *ColumnInfo) GetName() string
GetName implements formatters.ColumnSchema
func (*ColumnInfo) GetType ¶ added in v1.4.0
func (c *ColumnInfo) GetType() string
GetType implements formatters.ColumnSchema
type ColumnTypeMismatch ¶ added in v1.5.5
type ColumnTypeMismatch struct {
ColumnName string `json:"column_name"`
Source1Type string `json:"source1_type"`
Source2Type string `json:"source2_type"`
}
ColumnTypeMismatch represents a column type difference
type CompareConfig ¶ added in v1.5.5
type CompareConfig struct {
Mode string // schema-only, data-only, schema-and-data
DataCompareType string // row-count, row-by-row, sample
SampleSize int
Tables []string // Empty = all tables
OutputFormat string // text, json
OutputFile string
Debug bool
DryRun bool
}
CompareConfig contains comparison configuration
type Comparer ¶ added in v1.5.5
type Comparer struct {
// contains filtered or unexported fields
}
Comparer handles comparison operations
func NewComparer ¶ added in v1.5.5
func NewComparer(source1, source2 *ComparisonSource, config *CompareConfig, logger *slog.Logger) *Comparer
NewComparer creates a new Comparer instance
type ComparisonResult ¶ added in v1.5.5
type ComparisonResult struct {
Schema *SchemaComparisonResult `json:"schema,omitempty"`
Data *DataComparisonResult `json:"data,omitempty"`
}
ComparisonResult contains the results of a comparison
type ComparisonSource ¶ added in v1.5.5
type ComparisonSource struct {
Type string // "db" or "s3"
Database DatabaseConfig
S3 S3Config
SchemaPath string // S3 path for schemas
DataPath string // S3 path for data
SchemaSource string // pg_dump, inferred, auto
}
ComparisonSource represents a source for comparison (database or S3)
type Config ¶
type Config struct {
Debug bool
LogFormat string
DryRun bool
Workers int
SkipCount bool
CacheViewer bool
ViewerPort int
ChunkSize int // Number of rows to process in each chunk (streaming mode)
IncludeNonPartitionTables bool // Include regular tables matching partition naming pattern
Database DatabaseConfig
S3 S3Config
Table string
StartDate string
EndDate string
OutputDuration string
OutputFormat string
Compression string
CompressionLevel int
DateColumn string
DumpMode string // pg_dump mode: schema-only, data-only, schema-and-data
CacheScope CacheScope
}
type DataComparisonResult ¶ added in v1.5.5
type DataComparisonResult struct {
RowCountDiffs map[string]*RowCountDiff `json:"row_count_diffs,omitempty"`
RowByRowDiffs map[string]*RowByRowDiff `json:"row_by_row_diffs,omitempty"`
SampleDiffs map[string]*SampleDiff `json:"sample_diffs,omitempty"`
}
DataComparisonResult contains data comparison results
type DatabaseConfig ¶
type DatabaseConfig struct {
Host string
Port int
User string
Password string
Name string
SSLMode string
StatementTimeout int // Statement timeout in seconds (0 = no timeout, default 300)
MaxRetries int // Maximum number of retry attempts for failed queries (default 3)
RetryDelay int // Delay in seconds between retry attempts (default 5)
}
type GitHubRelease ¶
type GitHubRelease struct {
TagName string `json:"tag_name"`
Name string `json:"name"`
PublishedAt time.Time `json:"published_at"`
HTMLURL string `json:"html_url"`
}
GitHubRelease represents the structure of GitHub's latest release API response
type LogMessage ¶ added in v1.5.0
type PartitionCache ¶
type PartitionCache struct {
Entries map[string]PartitionCacheEntry `json:"entries"`
}
PartitionCache stores both row counts and file metadata
type PartitionCacheEntry ¶
type PartitionCacheEntry struct {
// Row count information
RowCount int64 `json:"row_count"`
CountTime time.Time `json:"count_time"`
// File metadata (stored after processing)
FileSize int64 `json:"file_size,omitempty"` // Compressed size
UncompressedSize int64 `json:"uncompressed_size,omitempty"` // Original size
FileMD5 string `json:"file_md5,omitempty"`
MultipartETag string `json:"multipart_etag,omitempty"` // S3 multipart ETag for files >100MB
FileTime time.Time `json:"file_time,omitempty"`
// S3 information
S3Key string `json:"s3_key,omitempty"`
S3Uploaded bool `json:"s3_uploaded,omitempty"`
S3UploadTime time.Time `json:"s3_upload_time,omitempty"`
// Error tracking
LastError string `json:"last_error,omitempty"`
ErrorTime time.Time `json:"error_time,omitempty"`
// Processing time tracking
ProcessStartTime time.Time `json:"process_start_time,omitempty"` // When processing started for this job
}
type PartitionInfo ¶
type PartitionInfo struct {
TableName string
Date time.Time
RowCount int64
RangeStart time.Time
RangeEnd time.Time
}
func (PartitionInfo) HasCustomRange ¶ added in v1.6.0
func (p PartitionInfo) HasCustomRange() bool
type PathTemplate ¶
type PathTemplate struct {
// contains filtered or unexported fields
}
PathTemplate provides functionality to generate S3 paths from templates
func NewPathTemplate ¶
func NewPathTemplate(template string) *PathTemplate
NewPathTemplate creates a new PathTemplate instance
type PgDumpExecutor ¶ added in v1.5.4
type PgDumpExecutor struct {
// contains filtered or unexported fields
}
PgDumpExecutor handles pg_dump operations with S3 upload
func NewPgDumpExecutor ¶ added in v1.5.4
func NewPgDumpExecutor(config *Config, logger *slog.Logger) *PgDumpExecutor
NewPgDumpExecutor creates a new pg_dump executor
type ProcessResult ¶
type ProcessResult struct {
Partition PartitionInfo
Compressed bool
Uploaded bool
Skipped bool
SkipReason string
Error error
BytesWritten int64
Stage string
S3Key string // S3 object key for uploaded file
StartTime time.Time // When partition processing started
Duration time.Duration // How long partition processing took
}
type Restorer ¶ added in v1.5.3
type Restorer struct {
// contains filtered or unexported fields
}
Restorer handles restoration of tables from S3
func NewRestorer ¶ added in v1.5.3
NewRestorer creates a new Restorer instance
type RowByRowDiff ¶ added in v1.5.5
type RowByRowDiff struct {
Source1TotalRows int64 `json:"source1_total_rows"`
Source2TotalRows int64 `json:"source2_total_rows"`
MatchingRows int64 `json:"matching_rows"`
MissingInSource2 int64 `json:"missing_in_source2"`
ExtraInSource2 int64 `json:"extra_in_source2"`
}
RowByRowDiff contains row-by-row comparison results
type RowCountCache ¶
type RowCountCache struct {
Counts map[string]RowCountEntry `json:"counts"`
}
Legacy support - keep old structure for backward compatibility
type RowCountDiff ¶ added in v1.5.5
type RowCountDiff struct {
Source1Count int64 `json:"source1_count"`
Source2Count int64 `json:"source2_count"`
Difference int64 `json:"difference"`
}
RowCountDiff contains row count differences
type RowCountEntry ¶
type S3File ¶ added in v1.5.3
type S3File struct {
Key string
Size int64
LastModified time.Time
DetectedFormat string
DetectedCompression string
Date time.Time // Extracted from filename
}
S3File represents a file found in S3
type SampleDiff ¶ added in v1.5.5
type SampleDiff struct {
Source1Sample []map[string]interface{} `json:"source1_sample,omitempty"`
Source2Sample []map[string]interface{} `json:"source2_sample,omitempty"`
Differences []string `json:"differences"`
}
SampleDiff contains sample comparison results
type SchemaComparisonResult ¶ added in v1.5.5
type SchemaComparisonResult struct {
TablesOnlyInSource1 []string `json:"tables_only_in_source1"`
TablesOnlyInSource2 []string `json:"tables_only_in_source2"`
TableDiffs map[string]*TableSchemaDiff `json:"table_diffs"`
}
SchemaComparisonResult contains schema comparison results
type StatusResponse ¶
type StatusResponse struct {
ArchiverRunning bool `json:"archiverRunning"`
PID int `json:"pid,omitempty"`
CurrentTask *TaskInfo `json:"currentTask,omitempty"`
Version string `json:"version"`
UpdateAvailable bool `json:"updateAvailable"`
LatestVersion string `json:"latestVersion,omitempty"`
ReleaseURL string `json:"releaseUrl,omitempty"`
// Slice tracking fields
CurrentSliceIndex int `json:"currentSliceIndex,omitempty"`
TotalSlices int `json:"totalSlices,omitempty"`
CurrentSliceDate string `json:"currentSliceDate,omitempty"`
IsSlicing bool `json:"isSlicing"`
}
type TableCache ¶
type TableCache struct {
TableName string `json:"tableName"`
Entries []CacheEntry `json:"entries"`
}
type TableSchema ¶ added in v1.4.0
type TableSchema struct {
TableName string
Columns []ColumnInfo
}
TableSchema represents the schema of a database table
func (*TableSchema) GetColumns ¶ added in v1.4.0
func (s *TableSchema) GetColumns() []formatters.ColumnSchema
GetColumns implements formatters.TableSchema
type TableSchemaDiff ¶ added in v1.5.5
type TableSchemaDiff struct {
ColumnsOnlyInSource1 []ColumnInfo `json:"columns_only_in_source1"`
ColumnsOnlyInSource2 []ColumnInfo `json:"columns_only_in_source2"`
TypeMismatches []ColumnTypeMismatch `json:"type_mismatches"`
}
TableSchemaDiff contains differences for a single table
type TaskInfo ¶
type TaskInfo struct {
PID int `json:"pid"`
StartTime time.Time `json:"start_time"`
Table string `json:"table"`
StartDate string `json:"start_date"`
EndDate string `json:"end_date"`
CurrentTask string `json:"current_task"`
CurrentPartition string `json:"current_partition,omitempty"`
CurrentStep string `json:"current_step,omitempty"`
Progress float64 `json:"progress"`
TotalItems int `json:"total_items"`
CompletedItems int `json:"completed_items"`
LastUpdate time.Time `json:"last_update"`
// Slice tracking fields
CurrentSliceIndex int `json:"current_slice_index,omitempty"`
TotalSlices int `json:"total_slices,omitempty"`
CurrentSliceDate string `json:"current_slice_date,omitempty"`
// Partition and slice statistics
TotalPartitions int `json:"total_partitions,omitempty"` // Total partitions discovered
PartitionsCounted int `json:"partitions_counted,omitempty"` // Partitions that have been counted
PartitionsProcessed int `json:"partitions_processed,omitempty"` // Partitions that have been processed
SlicesProcessed int `json:"slices_processed,omitempty"` // Total slices processed across all partitions
}
TaskInfo represents the current archiving task status
func ReadTaskInfo ¶
ReadTaskInfo reads current task information from file
type VersionCheckCache ¶
type VersionCheckCache struct {
UpdateAvailable bool `json:"update_available"`
LatestVersion string `json:"latest_version"`
ReleaseURL string `json:"release_url"`
Timestamp time.Time `json:"timestamp"`
}
VersionCheckCache represents cached version check data