Skip to content

Instantly share code, notes, and snippets.

@goranculibrk
Last active September 19, 2024 11:58
Show Gist options
  • Save goranculibrk/3c2d48015c0bd5f4bd51d5fee6c77eaa to your computer and use it in GitHub Desktop.
Save goranculibrk/3c2d48015c0bd5f4bd51d5fee6c77eaa to your computer and use it in GitHub Desktop.
Synchronize Production and Local Postgres Databases.
<?php
namespace App\Console\Commands\Maintenance;
use Illuminate\Console\Command;
use Symfony\Component\Process\Process;
class MigrateDatabaseServers extends Command
{
/**
* The name and signature of the console command.
*
* @var string
*/
protected $signature = 'app:migrate-database-servers
{--from= : Source connection string}
{--to= : Destination connection string}
{--src-schema= : Source schema name}
{--dest-schema=public : Destination schema name}
{--dump-dir= : Directory to store dump files}
{--tables= : What tables to migrate}
{--table= : What table to migrate}
{--chunk= : How big chunk size is}
{--incremental : Perform incremental sync}';
/**
* The console command description.
*
* @var string
*/
protected $description = 'Migrate database tables between servers using pg_dump and psql with specific chunk sizes and order';
/**
* Execute the console command.
*
* @return void
*/
public function handle()
{
// Parse command options
$tables = $this->option('tables');
$srcConn = $this->option('from');
$destConn = $this->option('to');
$srcSchema = $this->option('src-schema');
$destSchema = $this->option('dest-schema');
$dumpDir = $this->option('dump-dir') ?: storage_path('dumps');
$table = $this->option('table');
$chunk = $this->option('chunk');
$incremental = $this->option('incremental');
// Validate required options
if (!$srcConn || !$destConn || !$srcSchema) {
$this->error('Source connection, destination connection, and source schema are required.');
return;
}
// Ensure dump directory exists
if (!is_dir($dumpDir)) {
mkdir($dumpDir, 0777, true);
}
// Migrate a single table if specified
if ($table) {
$this->migrateTable($srcConn, $destConn, $srcSchema, $destSchema, $table, $chunk, $dumpDir, $incremental);
return;
}
$allTables = [
['table' => 'users', 'chunk' => 5000],
['table' => 'permissions', 'chunk' => 5000],
['table' => 'roles', 'chunk' => 5000],
['table' => 'model_has_permissions', 'chunk' => 5000],
['table' => 'model_has_roles', 'chunk' => 5000],
['table' => 'role_has_permissions', 'chunk' => 5000],
['table' => 'teams', 'chunk' => 5000],
['table' => 'team_user', 'chunk' => 5000],
['table' => 'keyword_follows', 'chunk' => 5000],
['table' => 'my_shopify_apps', 'chunk' => 5000],
['table' => 'app_follows', 'chunk' => 5000],
['table' => 'password_reset_tokens', 'chunk' => 5000],
['table' => 'personal_access_tokens', 'chunk' => 5000],
['table' => 'lemon_squeezy_customers', 'chunk' => 5000],
['table' => 'lemon_squeezy_subscriptions', 'chunk' => 5000],
['table' => 'lemon_squeezy_orders', 'chunk' => 5000],
];
$this->parallelMigrateTables($srcConn, $destConn, $srcSchema, $destSchema, $allTables, $dumpDir, $incremental);
$this->resetSequences($destConn, $destSchema, $allTables);
$this->info("Migration completed.");
}
/**
* Migrate tables in parallel using multiple processes.
*
* @param string $srcConn Source connection string
* @param string $destConn Destination connection string
* @param string $srcSchema Source schema name
* @param string $destSchema Destination schema name
* @param array $tables List of tables to migrate with their chunk sizes
* @param string $dumpDir Directory to store dump files
* @param bool $incremental Perform incremental sync
* @return void
*/
protected function parallelMigrateTables($srcConn, $destConn, $srcSchema, $destSchema, $tables, $dumpDir, $incremental)
{
$maxProcesses = $this->getCpuCores(); // Automatically detect the number of CPU cores
$activeProcesses = 0;
$pids = [];
$progressBars = [];
$lineIndex = 1; // Start from the first line
foreach ($tables as $tableInfo) {
if ($activeProcesses >= $maxProcesses) {
$pid = pcntl_wait($status);
if (isset($progressBars[$pid])) {
$progressBars[$pid]['progressBar']->finish();
$this->clearLines($progressBars[$pid]['lineIndex'], 2); // Clear the action and progress bar lines
unset($progressBars[$pid]);
}
unset($pids[$pid]);
$activeProcesses--;
}
$pid = pcntl_fork();
if ($pid == -1) {
$this->error("Could not fork process for table " . $tableInfo['table']);
continue;
} elseif ($pid) {
// Parent process
$pids[$pid] = $tableInfo['table'];
$activeProcesses++;
$progressBars[$pid] = [
'progressBar' => $this->displayProgress("Migrating table: " . $tableInfo['table'], 100, $lineIndex), // Assuming 100 as a placeholder for total steps
'lineIndex' => $lineIndex
];
$lineIndex += 2; // Increment by 2 lines for the next process
} else {
// Child process
$this->migrateTable($srcConn, $destConn, $srcSchema, $destSchema, $tableInfo['table'], $tableInfo['chunk'], $dumpDir, $incremental);
exit(0);
}
}
// Wait for all child processes to finish
while (count($pids) > 0) {
$pid = pcntl_wait($status);
if (isset($progressBars[$pid])) {
$progressBars[$pid]['progressBar']->finish();
$this->clearLines($progressBars[$pid]['lineIndex'], 2); // Clear the action and progress bar lines
unset($progressBars[$pid]);
}
unset($pids[$pid]);
}
}
/**
* Display a progress bar for a given action.
*
* @param string $action The action being performed, displayed as a message.
* @param int $total The total number of steps for the progress bar.
* @return \Symfony\Component\Console\Helper\ProgressBar The created progress bar.
*/
protected function displayProgress($action, $total, $lineIndex)
{
$this->output->write("\033[{$lineIndex};0H$action\n");
$progressBar = $this->output->createProgressBar($total);
$progressBar->setFormat("%current%/%max% [%bar%] %percent:3s%%");
$progressBar->start();
return $progressBar;
}
/**
* Migrate a list of tables from the source to the destination.
*
* @param string $srcConn Source connection string.
* @param string $destConn Destination connection string.
* @param string $srcSchema Source schema name.
* @param string $destSchema Destination schema name.
* @param array $tables List of tables to migrate with their chunk sizes.
* @param string $dumpDir Directory to store dump files.
* @param bool $incremental Perform incremental sync if true, otherwise perform full sync.
* @return void
*/
protected function migrateTables($srcConn, $destConn, $srcSchema, $destSchema, $tables, $dumpDir, $incremental)
{
foreach ($tables as $tableInfo) {
// Extract table name and chunk size
$table = $tableInfo['table'];
$chunkSize = $tableInfo['chunk'];
$this->migrateTable($srcConn, $destConn, $srcSchema, $destSchema, $table, $chunkSize, $dumpDir, $incremental);
}
}
/**
* Migrate a single table from the source to the destination.
*
* @param string $srcConn Source connection string.
* @param string $destConn Destination connection string.
* @param string $srcSchema Source schema name.
* @param string $destSchema Destination schema name.
* @param string $table The table name to migrate.
* @param int $chunkSize The size of each chunk for migration.
* @param string $dumpDir Directory to store dump files.
* @param bool $incremental Perform incremental sync if true, otherwise perform full sync.
* @return void
*/
protected function migrateTable($srcConn, $destConn, $srcSchema, $destSchema, $table, $chunkSize = 0, $dumpDir, $incremental = false)
{
if ($incremental) {
$this->info("Performing incremental sync for table $table");
// Check if 'id' column exists
$orderColumnCommand = "psql $srcConn -t -c \"SELECT column_name FROM information_schema.columns WHERE table_schema = '$srcSchema' AND table_name = '$table' AND column_name = 'id';\"";
$orderColumnProcess = Process::fromShellCommandline($orderColumnCommand);
$orderColumnProcess->run();
if (!$orderColumnProcess->isSuccessful()) {
$this->error("Error checking for 'id' column for table $table: " . $orderColumnProcess->getErrorOutput());
throw new \RuntimeException("Error checking for 'id' column for table $table: " . $orderColumnProcess->getErrorOutput());
}
$hasIdColumn = trim($orderColumnProcess->getOutput()) === 'id';
if ($hasIdColumn) {
// Get highest id from destination
$destMaxIdCommand = "psql $destConn -t -c \"SELECT COALESCE(MAX(id), 0) FROM $destSchema.$table;\"";
$destMaxIdProcess = Process::fromShellCommandline($destMaxIdCommand);
$destMaxIdProcess->run();
if (!$destMaxIdProcess->isSuccessful()) {
$this->error("Error getting max id from destination for table $table: " . $destMaxIdProcess->getErrorOutput());
throw new \RuntimeException("Error getting max id from destination for table $table: " . $destMaxIdProcess->getErrorOutput());
}
$destMaxId = (int)trim($destMaxIdProcess->getOutput());
// Get highest id from source
$srcMaxIdCommand = "psql $srcConn -t -c \"SELECT COALESCE(MAX(id), 0) FROM $srcSchema.$table;\"";
$srcMaxIdProcess = Process::fromShellCommandline($srcMaxIdCommand);
$srcMaxIdProcess->run();
if (!$srcMaxIdProcess->isSuccessful()) {
$this->error("Error getting max id from source for table $table: " . $srcMaxIdProcess->getErrorOutput());
throw new \RuntimeException("Error getting max id from source for table $table: " . $srcMaxIdProcess->getErrorOutput());
}
$srcMaxId = (int)trim($srcMaxIdProcess->getOutput());
if ($srcMaxId > $destMaxId) {
$this->info("Incremental sync required for table $table from id $destMaxId to $srcMaxId");
// Perform incremental dump and restore
$this->incrementalDumpAndRestore($srcConn, $destConn, $srcSchema, $destSchema, $table, $destMaxId, $srcMaxId, $chunkSize, $dumpDir);
} else {
$this->info("No new data to sync for table $table");
}
} else {
$this->info("Table $table does not have an 'id' column, performing full sync");
$this->fullSync($srcConn, $destConn, $srcSchema, $destSchema, $table, $chunkSize, $dumpDir);
}
} else {
$this->fullSync($srcConn, $destConn, $srcSchema, $destSchema, $table, $chunkSize, $dumpDir);
}
}
/**
* Perform a full synchronization of a table from the source to the destination.
*
* @param string $srcConn Source connection string.
* @param string $destConn Destination connection string.
* @param string $srcSchema Source schema name.
* @param string $destSchema Destination schema name.
* @param string $table The table name to synchronize.
* @param int $chunkSize The size of each chunk for migration.
* @param string $dumpDir Directory to store dump files.
* @return void
*/
protected function fullSync($srcConn, $destConn, $srcSchema, $destSchema, $table, $chunkSize, $dumpDir)
{
// Truncate the destination table
$truncateCommand = "psql $destConn -c \"TRUNCATE TABLE $destSchema.$table;\"";
$truncateProcess = Process::fromShellCommandline($truncateCommand);
$truncateProcess->run();
if (!$truncateProcess->isSuccessful()) {
$this->error("Error truncating table $table: " . $truncateProcess->getErrorOutput());
throw new \RuntimeException("Error truncating table $table: " . $truncateProcess->getErrorOutput());
}
// Dump the database table
$dumpFiles = $this->dumpDatabase($srcConn, $srcSchema, $table, $chunkSize, $dumpDir);
// Replace schema if necessary
if ($srcSchema !== $destSchema) {
$count = count($dumpFiles);
$this->info('Replacing schema name');
$bar = $this->output->createProgressBar($count);
$bar->start();
foreach ($dumpFiles as $file) {
$this->searchReplaceSchema($file, $srcSchema, $destSchema);
$bar->advance();
}
$bar->finish();
$this->info("\nReplacing finished.\n"); // Ensure newline for clarity
}
// Restore the database table
$this->restoreDatabase($destConn, $dumpFiles);
// Clean up dump files
foreach ($dumpFiles as $file) {
if (file_exists($file)) {
unlink($file);
}
}
}
/**
* Perform an incremental dump and restore of a table from the source to the destination.
*
* @param string $srcConn Source connection string.
* @param string $destConn Destination connection string.
* @param string $srcSchema Source schema name.
* @param string $destSchema Destination schema name.
* @param string $table The table name to synchronize.
* @param int $startId The starting ID for the incremental dump.
* @param int $endId The ending ID for the incremental dump.
* @param int $chunkSize The size of each chunk for migration.
* @param string $dumpDir Directory to store dump files.
* @return void
*/
protected function incrementalDumpAndRestore($srcConn, $destConn, $srcSchema, $destSchema, $table, $startId, $endId, $chunkSize, $dumpDir)
{
$dumpFiles = [];
$offset = 0;
$totalRows = $endId - $startId;
// Create a progress bar
$totalChunks = ceil($totalRows / $chunkSize);
$this->info('Dumping table ' . $table);
$progressBar = $this->output->createProgressBar($totalChunks);
$progressBar->start();
while ($startId + $offset < $endId) {
$chunkFile = $dumpDir . "/chunk_" . $table . "_" . ($startId + $offset) . ".sql";
$dumpFiles[] = $chunkFile;
// Generate SQL statements for the chunk
$sqlQuery = "COPY (SELECT * FROM $srcSchema.$table WHERE id > $startId + $offset AND id <= $startId + $offset + $chunkSize ORDER BY id) TO STDOUT";
$command = "psql $srcConn -c \"$sqlQuery\" > $chunkFile";
$process = Process::fromShellCommandline($command);
$process->run();
if (!$process->isSuccessful()) {
$this->error("Error exporting chunk for table $table (offset: $offset): " . $process->getErrorOutput());
throw new \RuntimeException("Error exporting chunk for table $table (offset: $offset): " . $process->getErrorOutput());
}
$offset += $chunkSize;
$progressBar->advance();
}
$progressBar->finish();
$this->info("\nExport completed for table $table\n"); // Ensure newline for clarity
// Replace schema if necessary
if ($srcSchema !== $destSchema) {
$count = count($dumpFiles);
$this->info('Replacing schema name');
$bar = $this->output->createProgressBar($count);
$bar->start();
foreach ($dumpFiles as $file) {
$this->searchReplaceSchema($file, $srcSchema, $destSchema);
$bar->advance();
}
$bar->finish();
$this->info("\nReplacing finished.\n"); // Ensure newline for clarity
}
// Restore the database table
$this->restoreDatabase($destConn, $dumpFiles);
// Clean up dump files
foreach ($dumpFiles as $file) {
if (file_exists($file)) {
unlink($file);
}
}
}
/**
* Dumps the database table to SQL files, either in chunks or as a single file.
*
* @param string $srcConn Source connection string
* @param string $srcSchema Source schema name
* @param string $table Table name to dump
* @param int $chunkSize Size of each chunk for chunked export
* @param string $dumpDir Directory to store dump files
* @return array List of dump files generated
* @throws \RuntimeException If any error occurs during the dumping process
*/
protected function dumpDatabase($srcConn, $srcSchema, $table, $chunkSize, $dumpDir)
{
$dumpFiles = [];
// Check if 'id' column exists
$orderColumnCommand = "psql $srcConn -t -c \"SELECT column_name FROM information_schema.columns WHERE table_schema = '$srcSchema' AND table_name = '$table' AND column_name = 'id';\"";
$orderColumnProcess = Process::fromShellCommandline($orderColumnCommand);
$orderColumnProcess->run();
if (!$orderColumnProcess->isSuccessful()) {
$this->error("Error checking for 'id' column for table $table: " . $orderColumnProcess->getErrorOutput());
throw new \RuntimeException("Error checking for 'id' column for table $table: " . $orderColumnProcess->getErrorOutput());
}
$hasIdColumn = trim($orderColumnProcess->getOutput()) === 'id';
if ((int)$chunkSize > 0) {
// Chunked export
$offset = 0;
$totalRowsCommand = "psql $srcConn -t -c \"SELECT COUNT(*) FROM $srcSchema.$table;\"";
$totalRowsProcess = Process::fromShellCommandline($totalRowsCommand);
$totalRowsProcess->run();
if (!$totalRowsProcess->isSuccessful()) {
$this->error("Error getting row count for table $table: " . $totalRowsProcess->getErrorOutput());
throw new \RuntimeException("Error getting row count for table $table: " . $totalRowsProcess->getErrorOutput());
}
$totalRows = (int)trim($totalRowsProcess->getOutput());
// Create a progress bar
$totalChunks = ceil($totalRows / $chunkSize);
$this->info('Dumping table ' . $table);
$progressBar = $this->output->createProgressBar($totalChunks);
$progressBar->start();
while ($offset < $totalRows) {
$chunkFile = $dumpDir . "/chunk_" . $table . "_" . $offset . ".sql";
$dumpFiles[] = $chunkFile;
// Generate SQL statements for the chunk
if ($hasIdColumn) {
$sqlQuery = "COPY (SELECT * FROM $srcSchema.$table ORDER BY id LIMIT $chunkSize OFFSET $offset) TO STDOUT";
} else {
// Use a default ordering strategy or no ordering if 'id' column is not present
$sqlQuery = "COPY (SELECT * FROM $srcSchema.$table LIMIT $chunkSize OFFSET $offset) TO STDOUT";
}
$command = "psql $srcConn -c \"$sqlQuery\" > $chunkFile";
$process = Process::fromShellCommandline($command);
$process->run();
if (!$process->isSuccessful()) {
$this->error("Error exporting chunk for table $table (offset: $offset): " . $process->getErrorOutput());
throw new \RuntimeException("Error exporting chunk for table $table (offset: $offset): " . $process->getErrorOutput());
}
$offset += $chunkSize;
$progressBar->advance();
}
$progressBar->finish();
$this->info("\nExport completed for table $table\n"); // Ensure newline for clarity
} else {
// Single file export
$dumpFile = $dumpDir . "/dump_" . $table . ".sql";
$dumpFiles[] = $dumpFile;
$command = "pg_dump $srcConn --schema=$srcSchema --table=$srcSchema.$table --no-owner --no-privileges --data-only --file=$dumpFile";
$this->info("Running pg_dump for table $table");
$process = Process::fromShellCommandline($command);
$process->setTimeout(15);
$process->run();
if (!$process->isSuccessful()) {
$this->error("Error dumping table $table: " . $process->getErrorOutput());
$this->error("pg_dump command: $command");
throw new \RuntimeException("Error dumping table $table: " . $process->getErrorOutput());
}
}
return $dumpFiles;
}
/**
* Restores the database from the provided dump files.
*
* @param string $destConn Destination connection string
* @param array $dumpFiles List of dump files to restore
* @return void
* @throws \RuntimeException If any error occurs during the restoration process
*/
protected function restoreDatabase($destConn, $dumpFiles)
{
$bar = $this->output->createProgressBar(count($dumpFiles));
$bar->start();
foreach ($dumpFiles as $dumpFile) {
// Ensure the file exists before trying to restore
if (!file_exists($dumpFile)) {
$this->error("Dump file $dumpFile does not exist.");
throw new \RuntimeException("Dump file $dumpFile does not exist.");
}
// Determine the table name from the dump file name
// Assuming the format is chunk_table_offset.sql
if (preg_match('/chunk_(.*?)_(\d+)\.sql$/', basename($dumpFile), $matches)) {
$table = $matches[1];
$offset = $matches[2];
} else {
$this->error("Could not determine table name from dump file $dumpFile.");
throw new \RuntimeException("Could not determine table name from dump file $dumpFile.");
}
// Read the dump file content
$dumpContent = file_get_contents($dumpFile);
if ($dumpContent === false) {
$this->error("Error reading dump file $dumpFile.");
throw new \RuntimeException("Error reading dump file $dumpFile.");
}
// Convert COPY statements to INSERT ... ON CONFLICT
$insertStatements = $this->convertCopyToInsert($dumpContent, $table);
// Execute the insert statements
foreach ($insertStatements as $insertStatement) {
$command = "psql $destConn -c \"$insertStatement\"";
$process = Process::fromShellCommandline($command);
$process->run();
if (!$process->isSuccessful()) {
$this->error("Error restoring dump file $dumpFile: " . $process->getErrorOutput());
throw new \RuntimeException("Error restoring dump file $dumpFile: " . $process->getErrorOutput());
}
}
$bar->advance();
}
$bar->finish();
$this->info("\nRestoration completed.\n"); // Ensure newline for clarity
}
/**
* Converts COPY statements in the dump content to INSERT ... ON CONFLICT statements.
*
* @param string $dumpContent Content of the dump file
* @param string $table Table name
* @return array List of INSERT statements
*/
protected function convertCopyToInsert($dumpContent, $table)
{
$lines = explode("\n", $dumpContent);
$insertStatements = [];
$columns = null;
foreach ($lines as $line) {
if (strpos($line, 'COPY') === 0) {
// Extract column names from COPY statement
preg_match('/COPY\s+\S+\s+\((.*?)\)\s+FROM/', $line, $matches);
$columns = $matches[1];
} elseif (!empty($line) && $columns) {
// Convert each line of data to an INSERT statement
$values = str_replace("\t", "', '", $line);
$values = "('" . $values . "')";
$insertStatements[] = "INSERT INTO $table ($columns) VALUES $values ON CONFLICT DO NOTHING;";
}
}
return $insertStatements;
}
/**
* Replaces the source schema with the destination schema in the dump file.
*
* @param string $dumpFile Path to the dump file
* @param string $srcSchema Source schema name
* @param string $destSchema Destination schema name
* @return void
* @throws \RuntimeException If any error occurs during the search and replace process
*/
protected function searchReplaceSchema($dumpFile, $srcSchema, $destSchema)
{
$command = "sed -i 's/{$srcSchema}/{$destSchema}/g' $dumpFile";
$process = Process::fromShellCommandline($command);
$process->run();
if (!$process->isSuccessful()) {
throw new \RuntimeException("Error performing search and replace: " . $process->getErrorOutput());
}
}
/**
* Resets the sequences for the specified tables in the destination schema.
*
* @param string $destConn Destination connection string
* @param string $destSchema Destination schema name
* @param array $tables List of tables with their information
* @return void
* @throws \RuntimeException If any error occurs during the sequence reset process
*/
protected function resetSequences($destConn, $destSchema, $tables)
{
foreach ($tables as $tableInfo) {
$table = $tableInfo['table'];
$sequenceName = "{$destSchema}.{$table}_id_seq";
$resetSequenceCommand = <<<SQL
DO \$\$
DECLARE
max_id BIGINT;
BEGIN
SELECT COALESCE(MAX(id), 0) INTO max_id FROM {$destSchema}.{$table};
EXECUTE 'SELECT setval(''{$sequenceName}'', ' || max_id + 1 || ', false)';
END;
\$\$;
SQL;
// Write the SQL command to a temporary file
$tempFile = tempnam(sys_get_temp_dir(), 'reset_sequence_');
file_put_contents($tempFile, $resetSequenceCommand);
// Execute the SQL command using psql
$process = Process::fromShellCommandline("psql $destConn -f $tempFile");
$process->run();
// Remove the temporary file
unlink($tempFile);
if (!$process->isSuccessful()) {
$this->error("Error resetting sequence for table $table: " . $process->getErrorOutput());
throw new \RuntimeException("Error resetting sequence for table $table: " . $process->getErrorOutput());
}
}
}
/**
* Clears the specified number of lines from the console output.
*
* This method moves the cursor up by the specified number of lines and clears each line.
* It is useful for removing previously printed lines from the console output.
*
* @param int $numLines The number of lines to clear.
* @return void
*/
protected function clearLines($lineIndex, $numLines)
{
for ($i = 0; $i < $numLines; $i++) {
// Move cursor to the specific line
$this->output->write("\033[{$lineIndex};0H");
// Clear the line
$this->output->write("\033[2K");
$lineIndex++;
}
}
/**
* Detects the number of CPU cores available on the system.
*
* This method attempts to determine the number of CPU cores on the system
* by executing platform-specific commands. If the detection fails, it defaults to 1 core.
*
* @return int The number of CPU cores detected.
*/
protected function getCpuCores()
{
$numCores = 1; // Default to 1 core if detection fails
if (stristr(PHP_OS, 'darwin')) {
// macOS
$process = Process::fromShellCommandline('sysctl -n hw.ncpu');
$process->run();
if ($process->isSuccessful()) {
$numCores = (int)trim($process->getOutput());
}
} elseif (stristr(PHP_OS, 'linux')) {
// Linux
$process = Process::fromShellCommandline('nproc');
$process->run();
if ($process->isSuccessful()) {
$numCores = (int)trim($process->getOutput());
}
} elseif (stristr(PHP_OS, 'win')) {
// Windows
$process = Process::fromShellCommandline('wmic cpu get NumberOfCores');
$process->run();
if ($process->isSuccessful()) {
$output = explode("\n", trim($process->getOutput()));
$numCores = (int)trim($output[1]);
}
}
return $numCores;
}
}
@goranculibrk
Copy link
Author

During the development of my new app, I've found myself in a need to migrate from Postgres DB on Azure (Cosmos DB) to self-hosted Supabase. Migration was PITA so I went to build a simple command to do it.

I've expanded it to include incremental sync, meaning I can easily pull my production database (select tables) into my dev environment without problems.

This command is not one size fits all, but it's a good starting point.

Basic Usage

Running a full sync

First we wipe the database and then running the command:

php artisan migrate:fresh
php artisan app:migrate-database-servers \
  --from="postgres://postgres:posgtres@postgres1:5432/database1" \
  --to="postgresql://postgres:postgres@postgres2:54322/database2" \
  --src-schema="ranksy" 
  --dest-schema="public" 
  --tables="all" 

Performing incremental sync

The command currently supports auto-increments for partial sync. I plan to expand to include timestamps and other primary keys to find the new data, but currently, for dev environment and fresh migration, it works for me.
To run incremental sync, simply add --incremental argument

php artisan app:migrate-database-servers \
  --from="postgres://postgres:posgtres@postgres1:5432/database1" \
  --to="postgresql://postgres:postgres@postgres2:54322/database2" \
  --src-schema="ranksy" 
  --dest-schema="public" 
  --tables="all" 
  --incremental

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment