migrate2psql.pl 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360
  1. #!/usr/bin/perl
  2. #
  3. # Copyright (C) Roman Dmitriev, rnd@rajven.ru
  4. #
  5. use utf8;
  6. use open ":encoding(utf8)";
  7. use open ':std', ':encoding(UTF-8)';
  8. use Encode;
  9. no warnings 'utf8';
  10. use English;
  11. use FindBin '$Bin';
  12. use lib "/opt/Eye/scripts";
  13. use Getopt::Long qw(GetOptions);
  14. use Data::Dumper;
  15. use eyelib::config;
  16. use eyelib::main;
  17. use eyelib::database;
  18. use eyelib::common;
  19. use eyelib::net_utils;
  20. use strict;
  21. use warnings;
  22. my $chunk_count = 1000;
  23. sub batch_sql_cached {
  24. my ($db, $sql, $data) = @_;
  25. # Запоминаем исходное состояние AutoCommit
  26. my $original_autocommit = $db->{AutoCommit};
  27. eval {
  28. # Выключаем AutoCommit для транзакции
  29. $db->{AutoCommit} = 0;
  30. my $sth = $db->prepare_cached($sql) or die "Unable to prepare SQL: " . $db->errstr;
  31. for my $params (@$data) {
  32. next unless @$params;
  33. $sth->execute(@$params) or die "Unable to execute with params [" . join(',', @$params) . "]: " . $sth->errstr;
  34. }
  35. $db->commit();
  36. 1;
  37. } or do {
  38. my $err = $@ || 'Unknown error';
  39. eval { $db->rollback() };
  40. warn "batch_sql_cached failed: $err";
  41. # Восстанавливаем AutoCommit даже при ошибке
  42. $db->{AutoCommit} = $original_autocommit;
  43. return 0;
  44. };
  45. # Восстанавливаем исходный режим AutoCommit
  46. $db->{AutoCommit} = $original_autocommit;
  47. return 1;
  48. }
  49. # debug disable force
  50. $debug = 0;
  51. # === Разбор аргументов командной строки ===
  52. my $opt_clear = 0;
  53. my $opt_batch = 0;
  54. GetOptions(
  55. 'clear' => \$opt_clear,
  56. 'batch' => \$opt_batch,
  57. ) or die "Usage: $0 [--clear] [--batch]\n";
  58. # === Явное указание портов ===
  59. my $MYSQL_PORT = 3306;
  60. my $PG_PORT = 5432;
  61. # === Подключение к MySQL (источник) ===
  62. my $mysql_dsn = "dbi:mysql:database=$DBNAME;host=$DBHOST;port=$MYSQL_PORT;mysql_local_infile=1";
  63. my $mysql_db = DBI->connect($mysql_dsn, $DBUSER, $DBPASS, {
  64. RaiseError => 0,
  65. AutoCommit => 1,
  66. mysql_enable_utf8 => 1
  67. });
  68. if (!defined $mysql_db) {
  69. die "Cannot connect to MySQL server: $DBI::errstr\n";
  70. }
  71. $mysql_db->do('SET NAMES utf8mb4');
  72. # === Подключение к PostgreSQL (цель) ===
  73. my $pg_dsn = "dbi:Pg:dbname=$DBNAME;host=$DBHOST;port=$PG_PORT;";
  74. my $pg_db = DBI->connect($pg_dsn, $DBUSER, $DBPASS, {
  75. RaiseError => 0,
  76. AutoCommit => 1,
  77. pg_enable_utf8 => 1,
  78. pg_server_prepare => 0
  79. });
  80. if (!defined $pg_db) {
  81. print "Cannot connect to PostgreSQL server: $DBI::errstr\n";
  82. print "For install/configure PostgreSQL server please run migrate2psql.sh!\n";
  83. exit 100;
  84. }
  85. # === Получение списка таблиц ===
  86. print "Fetching table list from MySQL...\n";
  87. my @migration_tables = get_records_sql($mysql_db, 'SHOW TABLES');
  88. my %tables;
  89. my $table_index = 'Tables_in_' . $DBNAME;
  90. foreach my $row (@migration_tables) {
  91. next unless $row && exists $row->{$table_index};
  92. my $table_name = $row->{$table_index};
  93. # Пропускаем traffic_detail (слишком большая)
  94. $tables{$table_name} = ($table_name !~ /(traffic_detail|sessions)/) ? 1 : 0;
  95. }
  96. # Фильтруем только те, что будем мигрировать
  97. my @tables_to_migrate = sort grep { $tables{$_} } keys %tables;
  98. my $total_tables = scalar @tables_to_migrate;
  99. if ($total_tables == 0) {
  100. print "No tables to migrate!\n";
  101. exit 0;
  102. }
  103. # === Опционально: очистка всех таблиц перед импортом ===
  104. if ($opt_clear) {
  105. print "\n⚠️ --clear mode: Truncating all target tables before import...\n";
  106. for my $table (@tables_to_migrate) {
  107. eval {
  108. $pg_db->do("TRUNCATE TABLE \"$table\" RESTART IDENTITY");
  109. };
  110. if ($@) {
  111. chomp $@;
  112. print " ⚠️ Failed to truncate table '$table': $@\n";
  113. } else {
  114. print " → Truncated: $table\n";
  115. }
  116. }
  117. print "\n";
  118. }
  119. print "\n=== Check DB schema ===\n\n";
  120. # === Сбор полной схемы из обеих БД ===
  121. print "Fetching schema from MySQL and PostgreSQL...\n";
  122. # === Сбор схем ===
  123. my %schema;
  124. for my $table (@tables_to_migrate) {
  125. next if $table =~ /(traffic_detail|sessions)/i;
  126. $schema{mysql}{$table} = { get_table_columns($mysql_db, $table) };
  127. }
  128. my @pg_tables = map { $_->{tablename} } get_records_sql($pg_db, "SELECT tablename FROM pg_tables WHERE schemaname = 'public'");
  129. for my $table (@pg_tables) {
  130. next if $table =~ /(traffic_detail|sessions)/i;
  131. $schema{pg}{$table} = { get_table_columns($pg_db, $table) };
  132. }
  133. # === Флаг ошибки ===
  134. my $has_critical_error = 0;
  135. # === 1. Проверка: всё ли из PostgreSQL есть в MySQL? ===
  136. for my $table (keys %{ $schema{pg} }) {
  137. if (!exists $schema{mysql}{$table}) {
  138. print "❗ ERROR: Table '$table' exists in PostgreSQL but not in MySQL!\n";
  139. $has_critical_error = 1;
  140. next;
  141. }
  142. for my $col (keys %{ $schema{pg}{$table} }) {
  143. if (!exists $schema{mysql}{$table}{$col}) {
  144. print "❗ ERROR: Column '$col' in table '$table' exists in PostgreSQL but not in MySQL!\n";
  145. $has_critical_error = 1;
  146. }
  147. }
  148. }
  149. # === 2. Проверка: есть ли лишнее в MySQL? ===
  150. for my $table (keys %{ $schema{mysql} }) {
  151. if (!exists $schema{pg}{$table}) {
  152. print "⚠️ WARNING: Table '$table' exists in MySQL but not in PostgreSQL — will be skipped.\n";
  153. next;
  154. }
  155. for my $col (keys %{ $schema{mysql}{$table} }) {
  156. if (!exists $schema{pg}{$table}{$col}) {
  157. print "⚠️ WARNING: Column '$col' in table '$table' exists in MySQL but not in PostgreSQL — will be ignored.\n";
  158. }
  159. }
  160. }
  161. if ($has_critical_error) {
  162. print "\nSchema validation failed: missing required tables/columns in source MySQL database.\n";
  163. exit 103;
  164. }
  165. print "✅ Schema validation passed.\n\n";
  166. print "\n=== Starting migration of $total_tables tables ===\n\n";
  167. # === Миграция по таблицам с прогрессом ===
  168. for my $idx (0 .. $#tables_to_migrate) {
  169. my $table = $tables_to_migrate[$idx];
  170. my $table_num = $idx + 1;
  171. if (!exists $schema{pg}->{$table}) { next; }
  172. print "[$table_num/$total_tables] Processing table: $table\n";
  173. my $rec_count = get_count_records($mysql_db, $table);
  174. print " → Expected records: $rec_count\n";
  175. if ($rec_count == 0) {
  176. print " → Empty table. Skipping.\n\n";
  177. next;
  178. }
  179. my $inserted = 0;
  180. my $errors = 0;
  181. # === Построчное чтение ===
  182. my $select_sth = $mysql_db->prepare("SELECT * FROM `$table`");
  183. $select_sth->execute();
  184. # === Режим вставки: построчный или пакетный ===
  185. if ($opt_batch) {
  186. print " → Using BATCH mode ($chunk_count records per chunk)\n";
  187. # Берём колонки напрямую из PostgreSQL-схемы — они все есть в MySQL
  188. my @valid_columns = sort keys %{ $schema{pg}{$table} };
  189. my $quoted_columns = '"' . join('", "', @valid_columns) . '"';
  190. my $placeholders = join(', ', ('?') x @valid_columns);
  191. my $insert_sql = "INSERT INTO \"$table\" ($quoted_columns) VALUES ($placeholders)";
  192. my @batch_buffer;
  193. my $chunk_size = $chunk_count;
  194. my $processed = 0;
  195. my $report_every = 10_000;
  196. while (my $row = $select_sth->fetchrow_hashref) {
  197. my @values;
  198. for my $col (@valid_columns) {
  199. my $raw_value = $row->{$col};
  200. my $norm_value = normalize_value($raw_value, $schema{pg}{$table}{$col});
  201. push @values, $norm_value;
  202. }
  203. push @batch_buffer, \@values;
  204. $processed++;
  205. if (@batch_buffer >= $chunk_count) {
  206. my $insert_status = batch_sql_cached($pg_db, $insert_sql, \@batch_buffer);
  207. if ($insert_status) {
  208. $inserted += @batch_buffer;
  209. } else {
  210. $errors += @batch_buffer;
  211. }
  212. @batch_buffer = ();
  213. # Прогресс
  214. if ($processed % $report_every == 0) {
  215. my $pct = int($processed * 100 / $rec_count);
  216. printf " → Processed: %d / %d (%d%%)\r", $processed, $rec_count, $pct;
  217. $| = 1; # flush STDOUT
  218. }
  219. }
  220. }
  221. # Остаток
  222. if (@batch_buffer) {
  223. my $insert_status = batch_sql_cached($pg_db, $insert_sql, \@batch_buffer);
  224. if ($insert_status) {
  225. $inserted += @batch_buffer;
  226. } else {
  227. $errors += @batch_buffer;
  228. }
  229. $processed += @batch_buffer;
  230. }
  231. # Финальная строка
  232. printf " → Processed: %d / %d (100%%)\n", $processed, $rec_count;
  233. } else {
  234. # === Построчный режим ===
  235. my $processed = 0;
  236. my $report_every = 10_000;
  237. while (my $row = $select_sth->fetchrow_hashref) {
  238. my %row_normalized;
  239. for my $col (keys %{ $schema{pg}{$table} }) {
  240. my $raw_value = $row->{$col};
  241. my $norm_value = normalize_value($raw_value, $schema{pg}{$table}{$col});
  242. $row_normalized{$col} = $norm_value;
  243. }
  244. my $ret_id = insert_record($pg_db, $table, \%row_normalized);
  245. if ($ret_id > 0) {
  246. $inserted++;
  247. } else {
  248. $errors++;
  249. print Dumper(\%row_normalized) if ($debug);
  250. }
  251. $processed++;
  252. # Прогресс каждые N строк
  253. if ($rec_count > 0 && $processed % $report_every == 0) {
  254. my $pct = int($processed * 100 / $rec_count);
  255. printf " → Processed: %d / %d (%d%%)\r", $processed, $rec_count, $pct;
  256. $| = 1; # flush
  257. }
  258. }
  259. $select_sth->finish();
  260. # Финальная строка
  261. if ($rec_count > 0) {
  262. printf " → Processed: %d / %d (100%%)\n", $processed, $rec_count;
  263. } else {
  264. print " → Processed: $processed records\n";
  265. }
  266. }
  267. # === Итог по таблице ===
  268. my $status = ($errors == 0) ? "✅ SUCCESS" : "⚠️ COMPLETED WITH ERRORS";
  269. print " → Result: $status\n";
  270. print " Inserted: $inserted | Errors: $errors | Expected: $rec_count\n";
  271. if ($inserted + $errors != $rec_count) {
  272. print " ❗ WARNING: Record count mismatch! (source: $rec_count, processed: " . ($inserted + $errors) . ")\n";
  273. }
  274. print "\n";
  275. }
  276. print "\n=== Resetting all table sequences ===\n";
  277. # Получаем список всех таблиц из целевой схемы (PostgreSQL)
  278. my $tables_sql = "SELECT tablename FROM pg_tables WHERE schemaname = 'public'";
  279. my $sth = $pg_db->prepare($tables_sql);
  280. $sth->execute();
  281. while (my ($table) = $sth->fetchrow_array) {
  282. # Формируем имя последовательности
  283. my $seq_name = "${table}_id_seq";
  284. # Проверяем, существует ли такая последовательность
  285. my ($exists) = $pg_db->selectrow_array(
  286. "SELECT 1 FROM pg_class WHERE relname = ? AND relkind = 'S'",
  287. undef, $seq_name
  288. );
  289. if ($exists) {
  290. # Получаем MAX(id)
  291. my ($max_id) = $pg_db->selectrow_array("SELECT MAX(id) FROM \"$table\"");
  292. $max_id //= 1;
  293. # Сбрасываем последовательность
  294. $pg_db->do("SELECT setval('$seq_name', $max_id)");
  295. print " → $table: sequence reset to $max_id\n";
  296. }
  297. }
  298. print "✅ All sequences updated.\n";
  299. print "🎉 Migration completed! Processed $total_tables tables.\n";
  300. exit 0;