README
ยถ
Snapshot Mode Example
This example demonstrates the initial snapshot feature of go-pq-cdc-elasticsearch. It shows how to capture existing data from PostgreSQL tables and index them into Elasticsearch before starting real-time CDC.
What This Example Does
-
Starts PostgreSQL with pre-populated data (via
init.sql):- 1,000 users
- 500 books
-
Takes Snapshot (Snapshot Mode:
initial):- Captures all existing data from
usersandbookstables - Processes data in chunks (1,000 rows per chunk)
- Indexes snapshot data into Elasticsearch
- Captures all existing data from
-
Transitions to CDC Mode:
- After snapshot completes, seamlessly switches to real-time CDC
- Captures any INSERT, UPDATE, DELETE operations
- Ensures zero data loss between snapshot and CDC phases
Prerequisites
- Docker and Docker Compose
- Go 1.22 or higher
Running the Example
1. Start Infrastructure
Start PostgreSQL and Elasticsearch:
docker-compose up -d
Wait a few seconds for services to be ready. PostgreSQL will automatically initialize the database with sample data using init.sql.
2. Verify Data
You can verify the initial data:
# Connect to PostgreSQL
psql "postgres://es_cdc_user:[email protected]/es_cdc_db"
# Check data
SELECT COUNT(*) FROM users; -- Should return 1000
SELECT COUNT(*) FROM books; -- Should return 500
3. Run the Connector
go run main.go
4. Observe the Snapshot Process
You'll see logs indicating:
- Snapshot initialization
- Chunk processing progress
- Total rows snapshotted
- Transition to CDC mode
Example log output:
{"level":"INFO","msg":"๐ธ snapshot data captured","table":"public.users","type":"SNAPSHOT","timestamp":"2024-11-24T10:15:30Z"}
{"level":"INFO","msg":"๐ธ snapshot data captured","table":"public.books","type":"SNAPSHOT","timestamp":"2024-11-24T10:15:31Z"}
{"level":"INFO","msg":"Snapshot completed successfully","duration":"5.2s","total_rows":1500}
{"level":"INFO","msg":"โจ insert captured","table":"public.users","type":"INSERT","timestamp":"2024-11-24T10:16:00Z"}
Notice the difference:
- ๐ธ
SNAPSHOTmessages = Initial data capture (historical data) - โจ
INSERTmessages = Real-time changes (new data after snapshot) - ๐
UPDATEmessages = Real-time updates - ๐๏ธ
DELETEmessages = Real-time deletions
5. Check Elasticsearch Indices
Access Kibana at http://localhost:5601 (login: elastic / es_cdc_es_pass) to see the indexed documents:
- Index
users: Contains all 1,000 users from snapshot - Index
books: Contains all 500 books from snapshot
Or use curl:
# Check users count
curl -u elastic:es_cdc_es_pass "http://localhost:9200/users/_count" | jq
# Check books count
curl -u elastic:es_cdc_es_pass "http://localhost:9200/books/_count" | jq
# Sample user document
curl -u elastic:es_cdc_es_pass "http://localhost:9200/users/_search?size=1" | jq
6. Test Real-Time CDC
Insert new data to see real-time CDC in action:
psql "postgres://es_cdc_user:[email protected]/es_cdc_db"
-- Insert a new user
INSERT INTO users (name, email) VALUES ('New User', '[email protected]');
-- Insert a new book
INSERT INTO books (title, author, isbn) VALUES ('New Book', 'New Author', 'ISBN-999');
-- Update a user
UPDATE users SET name = 'Updated User' WHERE id = 1;
-- Delete a book
DELETE FROM books WHERE id = 1;
These changes will be captured by CDC and indexed to Elasticsearch immediately.
Monitoring
Metrics
Access metrics at http://localhost:8081/metrics
Snapshot-specific metrics:
go_pq_cdc_snapshot_in_progress: Whether snapshot is runninggo_pq_cdc_snapshot_total_tables: Number of tables being snapshottedgo_pq_cdc_snapshot_total_chunks: Total chunks to processgo_pq_cdc_snapshot_completed_chunks: Completed chunksgo_pq_cdc_snapshot_total_rows: Total rows read during snapshotgo_pq_cdc_snapshot_duration_seconds: Snapshot duration
Health Check
curl http://localhost:8081/status
Configuration Highlights
Snapshot Configuration
Snapshot: cdcconfig.SnapshotConfig{
Enabled: true, // Enable snapshot
Mode: cdcconfig.SnapshotModeInitial, // Take snapshot only if no previous snapshot exists
ChunkSize: 1000, // Process 1000 rows per chunk
ClaimTimeout: 30 * time.Second, // Reclaim timeout for stale chunks
HeartbeatInterval: 5 * time.Second, // Worker heartbeat interval
}
Handler Function - Distinguishing Snapshot from CDC
func Handler(msg cdc.Message) []elasticsearch.Action {
// Check if this is a snapshot message
if msg.Type.IsSnapshot() {
slog.Info("๐ธ snapshot data captured")
// Index historical data from snapshot
return []elasticsearch.Action{
elasticsearch.NewIndexAction([]byte(id), payload, nil),
}
}
// Handle real-time CDC operations
if msg.Type.IsInsert() {
slog.Info("โจ insert captured")
// ...
}
if msg.Type.IsUpdate() {
slog.Info("๐ update captured")
// ...
}
if msg.Type.IsDelete() {
slog.Info("๐๏ธ delete captured")
return []elasticsearch.Action{
elasticsearch.NewDeleteAction([]byte(id), nil),
}
}
}
Key Points:
- Use
msg.Type.IsSnapshot()to identify snapshot messages - Use
msg.Type.IsInsert(),IsUpdate(),IsDelete()for CDC operations - Snapshot and Insert/Update use
NewIndexActionfor indexing - Delete uses
NewDeleteActionfor removing documents
Snapshot Modes
| Mode | Description |
|---|---|
initial |
Take snapshot only if no previous snapshot exists, then start CDC (used in this example) |
never |
Skip snapshot, start CDC immediately |
snapshot_only |
Take snapshot and exit (no CDC) |
Cleanup
# Stop services
docker-compose down
# Remove volumes (optional, to reset data)
docker-compose down -v
Key Differences from Simple Example
- โ
Pre-populated data: Database starts with existing data via
init.sql - โ Snapshot enabled: Captures existing data before CDC
- โ Zero data loss: Ensures all data (historical + real-time) is captured
- โ Chunk-based processing: Memory-efficient processing of large datasets
Learn More
Documentation
ยถ
There is no documentation for this package.