| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227 |
- #!/usr/bin/perl
- #
- # Copyright (C) Roman Dmitriev, rnd@rajven.ru
- #
- use utf8;
- use open ":encoding(utf8)";
- use open ':std', ':encoding(UTF-8)';
- use Encode;
- no warnings 'utf8';
- use English;
- use FindBin '$Bin';
- use lib "/opt/Eye/scripts";
- use Getopt::Long qw(GetOptions);
- use Data::Dumper;
- use eyelib::config;
- use eyelib::main;
- use eyelib::database;
- use eyelib::common;
- use eyelib::net_utils;
- use strict;
- use warnings;
- sub get_table_columns {
- my ($db, $table) = @_;
- my $sth = $db->column_info(undef, undef, $table, '%');
- my @cols;
- while (my $row = $sth->fetchrow_hashref) {
- push @cols, $row->{COLUMN_NAME};
- }
- return @cols;
- }
- sub batch_sql_cached {
- my ($db, $sql, $data) = @_;
- eval {
- my $sth = $db->prepare_cached($sql) or die "Unable to prepare SQL: " . $db->errstr;
- for my $params (@$data) {
- next unless @$params;
- $sth->execute(@$params) or die "Unable to execute with params [" . join(',', @$params) . "]: " . $sth->errstr;
- }
- $db->commit() if (!$db->{AutoCommit});
- 1;
- } or do {
- my $err = $@ || 'Unknown error';
- eval { $db->rollback() };
- print "batch_db_sql_cached failed: $err";
- return 0;
- };
- return 1;
- }
- # debug disable force
- $debug = 0;
- # === Разбор аргументов командной строки ===
- my $opt_clear = 0;
- my $opt_batch = 0;
- GetOptions(
- 'clear' => \$opt_clear,
- 'batch' => \$opt_batch,
- ) or die "Usage: $0 [--clear] [--batch]\n";
- # === Явное указание портов ===
- my $MYSQL_PORT = 3306;
- my $PG_PORT = 5432;
- # === Подключение к MySQL (источник) ===
- my $mysql_dsn = "dbi:mysql:database=$DBNAME;host=$DBHOST;port=$MYSQL_PORT;mysql_local_infile=1";
- my $mysql_db = DBI->connect($mysql_dsn, $DBUSER, $DBPASS, {
- RaiseError => 0,
- AutoCommit => 1,
- mysql_enable_utf8 => 1
- });
- if (!defined $mysql_db) {
- die "Cannot connect to MySQL server: $DBI::errstr\n";
- }
- $mysql_db->do('SET NAMES utf8mb4');
- # === Подключение к PostgreSQL (цель) ===
- my $pg_dsn = "dbi:Pg:dbname=$DBNAME;host=$DBHOST;port=$PG_PORT;";
- my $pg_db = DBI->connect($pg_dsn, $DBUSER, $DBPASS, {
- RaiseError => 0,
- AutoCommit => 1,
- pg_enable_utf8 => 1,
- pg_server_prepare => 0
- });
- if (!defined $pg_db) {
- print "Cannot connect to PostgreSQL server: $DBI::errstr\n";
- print "For install/configure PostgreSQL server please run migrate2psql.sh!\n";
- exit 100;
- }
- # === Получение списка таблиц ===
- print "Fetching table list from MySQL...\n";
- my @migration_tables = get_records_sql($mysql_db, 'SHOW TABLES');
- my %tables;
- my $table_index = 'Tables_in_' . $DBNAME;
- foreach my $row (@migration_tables) {
- next unless $row && exists $row->{$table_index};
- my $table_name = $row->{$table_index};
- # Пропускаем traffic_detail (слишком большая)
- $tables{$table_name} = ($table_name !~ /(traffic_detail|sessions)/) ? 1 : 0;
- }
- # Фильтруем только те, что будем мигрировать
- my @tables_to_migrate = sort grep { $tables{$_} } keys %tables;
- my $total_tables = scalar @tables_to_migrate;
- if ($total_tables == 0) {
- print "No tables to migrate!\n";
- exit 0;
- }
- # === Опционально: очистка всех таблиц перед импортом ===
- if ($opt_clear) {
- print "\n⚠️ --clear mode: Truncating all target tables before import...\n";
- for my $table (@tables_to_migrate) {
- eval {
- $pg_db->do("TRUNCATE TABLE \"$table\" RESTART IDENTITY");
- };
- if ($@) {
- chomp $@;
- print " ⚠️ Failed to truncate table '$table': $@\n";
- } else {
- print " → Truncated: $table\n";
- }
- }
- print "\n";
- }
- print "\n=== Starting migration of $total_tables tables ===\n\n";
- # === Миграция по таблицам с прогрессом ===
- for my $idx (0 .. $#tables_to_migrate) {
- my $table = $tables_to_migrate[$idx];
- my $table_num = $idx + 1;
- print "[$table_num/$total_tables] Processing table: $table\n";
- my $rec_count = get_count_records($mysql_db, $table);
- print " → Expected records: $rec_count\n";
- if ($rec_count == 0) {
- print " → Empty table. Skipping.\n\n";
- next;
- }
- # === Построчное чтение ===
- my $select_sth = $mysql_db->prepare("SELECT * FROM `$table`");
- $select_sth->execute();
- my $inserted = 0;
- my $errors = 0;
- # === Режим вставки: построчный или пакетный ===
- if ($opt_batch) {
- print " → Using BATCH mode (500 records per chunk)\n";
- # Получаем список колонок один раз
- my @columns = get_table_columns($mysql_db, $table);
- my $quoted_columns = '"' . join('", "', @columns) . '"';
- my $placeholders = join(', ', ('?') x @columns);
- my $insert_sql = "INSERT INTO \"$table\" ($quoted_columns) VALUES ($placeholders)";
- my @batch_buffer;
- my $chunk_size = 500;
- while (my $row = $select_sth->fetchrow_hashref) {
- my @values;
- for my $key (@columns) {
- my $value = $row->{$key};
- if (lc($key) eq 'ip') {
- $value = undef if !defined($value) || $value eq '';
- }
- push @values, $value;
- }
- push @batch_buffer, \@values;
- if (@batch_buffer >= 500) {
- my $insert_status = batch_sql_cached($pg_db, $insert_sql, \@batch_buffer);
- if ($insert_status) { $inserted += @batch_buffer; } else { $errors+=@batch_buffer; }
- @batch_buffer = ();
- }
- }
- # Остаток
- if (@batch_buffer) {
- my $insert_status = batch_sql_cached($pg_db, $insert_sql, \@batch_buffer);
- if ($insert_status) { $inserted += @batch_buffer; } else { $errors+=@batch_buffer; }
- }
- } else {
- # === построчный режим ===
- while (my $row = $select_sth->fetchrow_hashref) {
- # === Приведение имён полей к нижнему регистру ===
- my %row_normalized;
- while (my ($key, $value) = each %$row) {
- my $n_key = lc($key);
- if ($n_key eq 'ip') {
- if (!defined $value || $value eq '') { $value = undef; }
- }
- $row_normalized{$n_key} = $value;
- }
- my $ret_id = insert_record($pg_db, $table, \%row_normalized);
- if ($ret_id>0) { $inserted++; } else {
- $errors++;
- print Dumper(\%row_normalized) if ($debug);
- }
- }
- $select_sth->finish();
- }
- # === Итог по таблице ===
- my $status = ($errors == 0) ? "✅ SUCCESS" : "⚠️ COMPLETED WITH ERRORS";
- print " → Result: $status\n";
- print " Inserted: $inserted | Errors: $errors | Expected: $rec_count\n";
-
- if ($inserted + $errors != $rec_count) {
- print " ❗ WARNING: Record count mismatch! (source: $rec_count, processed: " . ($inserted + $errors) . ")\n";
- }
-
- print "\n";
- }
- print "🎉 Migration completed! Processed $total_tables tables.\n";
- exit 0;
|