migrate2psql.pl 6.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227
  1. #!/usr/bin/perl
  2. #
  3. # Copyright (C) Roman Dmitriev, rnd@rajven.ru
  4. #
  5. use utf8;
  6. use open ":encoding(utf8)";
  7. use open ':std', ':encoding(UTF-8)';
  8. use Encode;
  9. no warnings 'utf8';
  10. use English;
  11. use FindBin '$Bin';
  12. use lib "/opt/Eye/scripts";
  13. use Getopt::Long qw(GetOptions);
  14. use Data::Dumper;
  15. use eyelib::config;
  16. use eyelib::main;
  17. use eyelib::database;
  18. use eyelib::common;
  19. use eyelib::net_utils;
  20. use strict;
  21. use warnings;
  22. sub get_table_columns {
  23. my ($db, $table) = @_;
  24. my $sth = $db->column_info(undef, undef, $table, '%');
  25. my @cols;
  26. while (my $row = $sth->fetchrow_hashref) {
  27. push @cols, $row->{COLUMN_NAME};
  28. }
  29. return @cols;
  30. }
  31. sub batch_sql_cached {
  32. my ($db, $sql, $data) = @_;
  33. eval {
  34. my $sth = $db->prepare_cached($sql) or die "Unable to prepare SQL: " . $db->errstr;
  35. for my $params (@$data) {
  36. next unless @$params;
  37. $sth->execute(@$params) or die "Unable to execute with params [" . join(',', @$params) . "]: " . $sth->errstr;
  38. }
  39. $db->commit() if (!$db->{AutoCommit});
  40. 1;
  41. } or do {
  42. my $err = $@ || 'Unknown error';
  43. eval { $db->rollback() };
  44. print "batch_db_sql_cached failed: $err";
  45. return 0;
  46. };
  47. return 1;
  48. }
  49. # debug disable force
  50. $debug = 0;
  51. # === Разбор аргументов командной строки ===
  52. my $opt_clear = 0;
  53. my $opt_batch = 0;
  54. GetOptions(
  55. 'clear' => \$opt_clear,
  56. 'batch' => \$opt_batch,
  57. ) or die "Usage: $0 [--clear] [--batch]\n";
  58. # === Явное указание портов ===
  59. my $MYSQL_PORT = 3306;
  60. my $PG_PORT = 5432;
  61. # === Подключение к MySQL (источник) ===
  62. my $mysql_dsn = "dbi:mysql:database=$DBNAME;host=$DBHOST;port=$MYSQL_PORT;mysql_local_infile=1";
  63. my $mysql_db = DBI->connect($mysql_dsn, $DBUSER, $DBPASS, {
  64. RaiseError => 0,
  65. AutoCommit => 1,
  66. mysql_enable_utf8 => 1
  67. });
  68. if (!defined $mysql_db) {
  69. die "Cannot connect to MySQL server: $DBI::errstr\n";
  70. }
  71. $mysql_db->do('SET NAMES utf8mb4');
  72. # === Подключение к PostgreSQL (цель) ===
  73. my $pg_dsn = "dbi:Pg:dbname=$DBNAME;host=$DBHOST;port=$PG_PORT;";
  74. my $pg_db = DBI->connect($pg_dsn, $DBUSER, $DBPASS, {
  75. RaiseError => 0,
  76. AutoCommit => 1,
  77. pg_enable_utf8 => 1,
  78. pg_server_prepare => 0
  79. });
  80. if (!defined $pg_db) {
  81. print "Cannot connect to PostgreSQL server: $DBI::errstr\n";
  82. print "For install/configure PostgreSQL server please run migrate2psql.sh!\n";
  83. exit 100;
  84. }
  85. # === Получение списка таблиц ===
  86. print "Fetching table list from MySQL...\n";
  87. my @migration_tables = get_records_sql($mysql_db, 'SHOW TABLES');
  88. my %tables;
  89. my $table_index = 'Tables_in_' . $DBNAME;
  90. foreach my $row (@migration_tables) {
  91. next unless $row && exists $row->{$table_index};
  92. my $table_name = $row->{$table_index};
  93. # Пропускаем traffic_detail (слишком большая)
  94. $tables{$table_name} = ($table_name !~ /(traffic_detail|sessions)/) ? 1 : 0;
  95. }
  96. # Фильтруем только те, что будем мигрировать
  97. my @tables_to_migrate = sort grep { $tables{$_} } keys %tables;
  98. my $total_tables = scalar @tables_to_migrate;
  99. if ($total_tables == 0) {
  100. print "No tables to migrate!\n";
  101. exit 0;
  102. }
  103. # === Опционально: очистка всех таблиц перед импортом ===
  104. if ($opt_clear) {
  105. print "\n⚠️ --clear mode: Truncating all target tables before import...\n";
  106. for my $table (@tables_to_migrate) {
  107. eval {
  108. $pg_db->do("TRUNCATE TABLE \"$table\" RESTART IDENTITY");
  109. };
  110. if ($@) {
  111. chomp $@;
  112. print " ⚠️ Failed to truncate table '$table': $@\n";
  113. } else {
  114. print " → Truncated: $table\n";
  115. }
  116. }
  117. print "\n";
  118. }
  119. print "\n=== Starting migration of $total_tables tables ===\n\n";
  120. # === Миграция по таблицам с прогрессом ===
  121. for my $idx (0 .. $#tables_to_migrate) {
  122. my $table = $tables_to_migrate[$idx];
  123. my $table_num = $idx + 1;
  124. print "[$table_num/$total_tables] Processing table: $table\n";
  125. my $rec_count = get_count_records($mysql_db, $table);
  126. print " → Expected records: $rec_count\n";
  127. if ($rec_count == 0) {
  128. print " → Empty table. Skipping.\n\n";
  129. next;
  130. }
  131. # === Построчное чтение ===
  132. my $select_sth = $mysql_db->prepare("SELECT * FROM `$table`");
  133. $select_sth->execute();
  134. my $inserted = 0;
  135. my $errors = 0;
  136. # === Режим вставки: построчный или пакетный ===
  137. if ($opt_batch) {
  138. print " → Using BATCH mode (500 records per chunk)\n";
  139. # Получаем список колонок один раз
  140. my @columns = get_table_columns($mysql_db, $table);
  141. my $quoted_columns = '"' . join('", "', @columns) . '"';
  142. my $placeholders = join(', ', ('?') x @columns);
  143. my $insert_sql = "INSERT INTO \"$table\" ($quoted_columns) VALUES ($placeholders)";
  144. my @batch_buffer;
  145. my $chunk_size = 500;
  146. while (my $row = $select_sth->fetchrow_hashref) {
  147. my @values;
  148. for my $key (@columns) {
  149. my $value = $row->{$key};
  150. if (lc($key) eq 'ip') {
  151. $value = undef if !defined($value) || $value eq '';
  152. }
  153. push @values, $value;
  154. }
  155. push @batch_buffer, \@values;
  156. if (@batch_buffer >= 500) {
  157. my $insert_status = batch_sql_cached($pg_db, $insert_sql, \@batch_buffer);
  158. if ($insert_status) { $inserted += @batch_buffer; } else { $errors+=@batch_buffer; }
  159. @batch_buffer = ();
  160. }
  161. }
  162. # Остаток
  163. if (@batch_buffer) {
  164. my $insert_status = batch_sql_cached($pg_db, $insert_sql, \@batch_buffer);
  165. if ($insert_status) { $inserted += @batch_buffer; } else { $errors+=@batch_buffer; }
  166. }
  167. } else {
  168. # === построчный режим ===
  169. while (my $row = $select_sth->fetchrow_hashref) {
  170. # === Приведение имён полей к нижнему регистру ===
  171. my %row_normalized;
  172. while (my ($key, $value) = each %$row) {
  173. my $n_key = lc($key);
  174. if ($n_key eq 'ip') {
  175. if (!defined $value || $value eq '') { $value = undef; }
  176. }
  177. $row_normalized{$n_key} = $value;
  178. }
  179. my $ret_id = insert_record($pg_db, $table, \%row_normalized);
  180. if ($ret_id>0) { $inserted++; } else {
  181. $errors++;
  182. print Dumper(\%row_normalized) if ($debug);
  183. }
  184. }
  185. $select_sth->finish();
  186. }
  187. # === Итог по таблице ===
  188. my $status = ($errors == 0) ? "✅ SUCCESS" : "⚠️ COMPLETED WITH ERRORS";
  189. print " → Result: $status\n";
  190. print " Inserted: $inserted | Errors: $errors | Expected: $rec_count\n";
  191. if ($inserted + $errors != $rec_count) {
  192. print " ❗ WARNING: Record count mismatch! (source: $rec_count, processed: " . ($inserted + $errors) . ")\n";
  193. }
  194. print "\n";
  195. }
  196. print "🎉 Migration completed! Processed $total_tables tables.\n";
  197. exit 0;