Prechádzať zdrojové kódy

fixed import data from csv for postgres

Dmitriev Roman 3 mesiacov pred
rodič
commit
2474bc9e9c

+ 11 - 11
docs/databases/postgres/en/create_db.sql

@@ -423,23 +423,23 @@ COMMENT ON COLUMN subnets.office IS 'This is an office subnet';
 COMMENT ON COLUMN subnets.hotspot IS 'This is a public/guest subnet';
 COMMENT ON COLUMN subnets.notify IS 'Notification bitmask: 1=email, 2=sms, 4=telegram';
 
--- Detailed traffic logs
 CREATE TABLE traffic_detail (
 id BIGSERIAL PRIMARY KEY,
-auth_id BIGINT,
-router_id INTEGER NOT NULL DEFAULT 0,
+auth_id   bigint,
+router_id integer NOT NULL DEFAULT 0,
 ts TIMESTAMP,
-proto SMALLINT,
-src_ip INTEGER NOT NULL,
-dst_ip INTEGER NOT NULL,
-src_port INTEGER NOT NULL,
-dst_port INTEGER NOT NULL,
-bytes BIGINT NOT NULL,
-pkt INTEGER NOT NULL DEFAULT 0
+proto     smallint,
+src_ip    bigint NOT NULL DEFAULT 0,
+dst_ip    bigint NOT NULL DEFAULT 0,
+src_port  integer NOT NULL DEFAULT 0,
+dst_port  integer NOT NULL DEFAULT 0,
+bytes     bigint NOT NULL DEFAULT 0,
+pkt       bigint NOT NULL DEFAULT 0,
 );
 COMMENT ON TABLE traffic_detail IS 'Detailed traffic flow records (NetFlow)';
 COMMENT ON COLUMN traffic_detail.proto IS 'IP protocol number';
 COMMENT ON COLUMN traffic_detail.src_ip IS 'Source IP as integer';
+COMMENT ON COLUMN traffic_detail.dst_ip IS 'Destination IP as integer';
 COMMENT ON COLUMN traffic_detail.bytes IS 'Bytes transferred in this flow';
 
 -- Unknown MAC addresses
@@ -601,7 +601,7 @@ COMMENT ON TABLE version IS 'System version information';
 
 -- WAN interface statistics
 CREATE TABLE wan_stats (
-id SERIAL PRIMARY KEY,
+id BIGSERIAL PRIMARY KEY,
 ts TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
 router_id INTEGER,
 interface_id INTEGER,

+ 12 - 11
docs/databases/postgres/ru/create_db.sql

@@ -421,25 +421,26 @@ COMMENT ON COLUMN subnets.subnet IS 'Сеть в нотации CIDR';
 COMMENT ON COLUMN subnets.vlan_tag IS 'ID VLAN для этой подсети';
 COMMENT ON COLUMN subnets.office IS 'Это офисная подсеть';
 COMMENT ON COLUMN subnets.hotspot IS 'Это публичная/гостевая подсеть';
-COMMENT ON COLUMN subnets.notify IS 'Битовая маска для уведомлений: 1=email, 2=sms, 4=telegram';
+COMMENT ON COLUMN subnets.notify IS 'Битовая маска для уведомлений по типу событий';
 
 -- Подробные логи трафика
 CREATE TABLE traffic_detail (
 id BIGSERIAL PRIMARY KEY,
-auth_id BIGINT,
-router_id INTEGER NOT NULL DEFAULT 0,
+auth_id   bigint,
+router_id integer NOT NULL DEFAULT 0,
 ts TIMESTAMP,
-proto SMALLINT,
-src_ip INTEGER NOT NULL,
-dst_ip INTEGER NOT NULL,
-src_port INTEGER NOT NULL,
-dst_port INTEGER NOT NULL,
-bytes BIGINT NOT NULL,
-pkt INTEGER NOT NULL DEFAULT 0
+proto     smallint,
+src_ip    bigint NOT NULL DEFAULT 0,
+dst_ip    bigint NOT NULL DEFAULT 0,
+src_port  integer NOT NULL DEFAULT 0,
+dst_port  integer NOT NULL DEFAULT 0,
+bytes     bigint NOT NULL DEFAULT 0,
+pkt       bigint NOT NULL DEFAULT 0,
 );
 COMMENT ON TABLE traffic_detail IS 'Подробные записи потоков трафика (NetFlow)';
 COMMENT ON COLUMN traffic_detail.proto IS 'Номер IP протокола';
 COMMENT ON COLUMN traffic_detail.src_ip IS 'Исходный IP в виде целого числа';
+COMMENT ON COLUMN traffic_detail.dst_ip IS 'Адрес назначения IP в виде целого числа';
 COMMENT ON COLUMN traffic_detail.bytes IS 'Байтов переданно в этом потоке';
 
 -- Неизвестные MAC-адреса
@@ -601,7 +602,7 @@ COMMENT ON TABLE version IS 'Информация о версии системы
 
 -- Статистика WAN интерфейсов
 CREATE TABLE wan_stats (
-id SERIAL PRIMARY KEY,
+id BIGSERIAL PRIMARY KEY,
 ts TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
 router_id INTEGER,
 interface_id INTEGER,

+ 4 - 2
install-eye.sh

@@ -283,7 +283,8 @@ install_deps_altlinux() {
             perl-DateTime-Format-DateParse perl-DateTime-Format-Strptime \
             perl-Net-OpenSSH perl-File-Tail perl-Tie-File \
             perl-Crypt-Rijndael perl-Crypt-CBC perl-CryptX perl-Crypt-DES \
-            perl-File-Path-Tiny perl-Expect perl-Proc-ProcessTable
+            perl-File-Path-Tiny perl-Expect perl-Proc-ProcessTable \
+            perl-Text-CSV
 
         # Специфичные DBD-драйверы
         if [[ "$DB_TYPE" == "postgresql" ]]; then
@@ -347,7 +348,8 @@ install_deps_debian() {
             libdatetime-format-dateparse-perl libnetwork-ipv4addr-perl \
             libnet-openssh-perl libfile-tail-perl libdatetime-format-strptime-perl \
             libcrypt-rijndael-perl libcrypt-cbc-perl libcryptx-perl \
-            libcrypt-des-perl libfile-path-tiny-perl libexpect-perl
+            libcrypt-des-perl libfile-path-tiny-perl libexpect-perl \
+            libtext-csv-perl
 
         # DBD-драйверы
         if [[ "$DB_TYPE" == "postgresql" ]]; then

+ 2 - 0
scripts/eye-statd.pl

@@ -911,6 +911,8 @@ if ($config_ref{enable_quotes}) {
 
 if (scalar(@detail_traffic)) {
     db_log_debug($hdb,"Start write traffic detail to DB. ".scalar @detail_traffic." lines count") if ($debug);
+    my $traffic_fields = ['auth_id', 'router_id', 'ts', 'proto', 'src_ip', 'dst_ip', 'src_port', 'dst_port', 'bytes', 'pkt'];
+    unshift @detail_traffic, $traffic_fields;
     batch_db_sql_csv("traffic_detail",\@detail_traffic);
     @detail_traffic = ();
     db_log_debug($hdb,"Write traffic detail to DB stopped") if ($debug);

+ 203 - 36
scripts/eyelib/database.pm

@@ -22,6 +22,7 @@ use POSIX qw(mktime ctime strftime);
 use File::Temp qw(tempfile);
 use DBI;
 use DBD::Pg qw(:pg_types);
+use Text::CSV;
 
 our @ISA = qw(Exporter);
 
@@ -125,20 +126,14 @@ return $res;
 #---------------------------------------------------------------------------------------------------------------
 sub batch_db_sql_cached {
     my ($sql, $data) = @_;
-
     my $db=init_db();
-
     eval {
-        my $sth = $db->prepare_cached($sql)
-            or die "Unable to prepare SQL: " . $db->errstr;
-
+        my $sth = $db->prepare_cached($sql) or die "Unable to prepare SQL: " . $db->errstr;
         for my $params (@$data) {
             next unless @$params;
-            $sth->execute(@$params)
-                or die "Unable to execute with params [" . join(',', @$params) . "]: " . $sth->errstr;
+            $sth->execute(@$params) or die "Unable to execute with params [" . join(',', @$params) . "]: " . $sth->errstr;
         }
-
-        $db->commit();
+        $db->commit() if (!$db->{AutoCommit});
         1;
     } or do {
         my $err = $@ || 'Unknown error';
@@ -146,7 +141,6 @@ sub batch_db_sql_cached {
         $db->disconnect();
         die "batch_db_sql_cached failed: $err";
     };
-
     $db->disconnect();
     return 1;
 }
@@ -155,42 +149,212 @@ sub batch_db_sql_cached {
 
 sub batch_db_sql_csv {
     my ($table, $data) = @_;
+    return 0 unless @$data;
+
+    # Первая строка — заголовки (имена столбцов)
+    my $header_row = shift @$data;
+    unless ($header_row && ref($header_row) eq 'ARRAY' && @$header_row) {
+        log_error("First row must be column names (array reference)");
+        return 0;
+    }
+    my @columns = @$header_row;
+
+    # Теперь @$data содержит только строки данных
+    my $data_rows = $data;
+
+    # Если нет данных — только заголовок
+    unless (@$data_rows) {
+        log_debug("No data rows to insert, only header");
+        return 1;
+    }
+
     my $db = init_db();
+
     if ($config_ref{DBTYPE} eq 'mysql') {
+        # --- MySQL: попытка LOAD DATA, fallback на INSERT ---
+        log_debug("Using LOAD DATA LOCAL INFILE for MySQL");
+
         my $fh = File::Temp->new(UNLINK => 1);
-	my $fname = $fh->filename;
+        my $fname = $fh->filename;
         binmode($fh, ':utf8');
-	foreach my $row (@$data) {
-	    next if (!$row);
-	    my @tmp = @$row;
-	    my $values = 'NULL';
-	    for (my $i = 0; $i <@tmp ; $i++) {
-		$values.=',"'.$tmp[$i].'"';
-		}
-	    $values =~s/,$//;
-	    print $fh $values."\r\n";
-	    }
+
+        my $csv = Text::CSV->new({
+            binary         => 1,
+            quote_char     => '"',
+            escape_char    => '"',
+            sep_char       => ',',
+            eol            => "\r\n",
+            always_quote   => 1,
+        }) or do {
+            my $err = "Cannot create Text::CSV: " . Text::CSV->error_diag();
+            log_error($err);
+            $db->disconnect();
+            return 0;
+        };
+
+        # Пишем заголовок
+        $csv->print($fh, \@columns);
+
+        # Пишем данные
+        for my $row (@$data_rows) {
+            next unless $row && ref($row) eq 'ARRAY' && @$row == @columns;
+            my @vals = map { defined($_) ? $_ : 'NULL' } @$row;
+            $csv->print($fh, \@vals);
+        }
         close $fh;
-	my $query = qq{ LOAD DATA LOCAL INFILE '$fname' INTO TABLE $table FIELDS TERMINATED BY ',' OPTIONALLY ENCLOSED BY '"' LINES TERMINATED BY '\r\n'; };
-        $db->do($query);
-    } else {
-        # PostgreSQL: используем COPY ... FROM STDIN
-        my $copy_sql = "COPY $table FROM STDIN WITH (FORMAT CSV, DELIMITER ',', NULL 'NULL')";
-        $db->do($copy_sql);  # Переключает соединение в режим копирования
-        for my $row (@$data) {
-            next unless $row && @$row;
-            my $line = 'NULL';  # автоинкремент
-            for my $val (@$row) {
-                $line .= defined($val) ? ',' . $val : ',NULL';
+
+        my $col_list = join(', ', map { $db->quote_identifier($_) } @columns);
+        my $query = qq{LOAD DATA LOCAL INFILE '$fname' INTO TABLE $table FIELDS TERMINATED BY ',' OPTIONALLY ENCLOSED BY '"' LINES TERMINATED BY '\r\n' IGNORE 1 LINES ($col_list)};
+
+        my $load_ok = eval { $db->do($query); 1 };
+        if (!$load_ok) {
+            my $err = "MySQL LOAD DATA failed: $@";
+            log_error($err);
+            log_debug("Falling back to bulk INSERT for MySQL");
+            goto FALLBACK_INSERT_MYSQL;
+        }
+
+        $db->disconnect();
+        return 1;
+
+        # ========================
+        # Fallback для MySQL
+        # ========================
+        FALLBACK_INSERT_MYSQL:
+        {
+            my $quoted_cols = join(', ', map { $db->quote_identifier($_) } @columns);
+            my $placeholders = join(',', ('?') x @columns);
+            my $sql = "INSERT INTO $table ($quoted_cols) VALUES ($placeholders)";
+            my $sth = $db->prepare($sql);
+
+            my $success = eval {
+                for my $row (@$data_rows) {
+                    next unless $row && ref($row) eq 'ARRAY' && @$row == @columns;
+                    my @vals = map { defined($_) ? $_ : undef } @$row;
+                    $sth->execute(@vals);
+                }
+                1;
+            };
+
+            if (!$success) {
+                my $err = "MySQL bulk INSERT failed: $@";
+                log_error($err);
+                $db->disconnect();
+                return 0;
+            }
+        }
+
+    } elsif ($config_ref{DBTYPE} eq 'postgresql') {
+        unless ($db->{Driver}->{Name} eq 'Pg') {
+            my $err = "PostgreSQL expected but connected via " . $db->{Driver}->{Name};
+            log_error($err);
+            $db->disconnect();
+            return 0;
+        }
+
+        if (!$db->can('pg_putcopydata') || !$db->can('pg_putcopyend')) {
+            log_debug("pg_putcopydata/pg_putcopyend not available — falling back to bulk INSERT");
+            goto FALLBACK_INSERT_PG;
+        }
+
+        my $col_list = join(', ', map { $db->quote_identifier($_) } @columns);
+        my $copy_sql = "COPY $table ($col_list) FROM STDIN WITH (FORMAT CSV, HEADER true)";
+
+        my $use_header_as_data;
+        my $start_ok = eval { $db->do($copy_sql); 1 };
+
+        if (!$start_ok) {
+            log_debug("COPY with HEADER failed: $@ — trying without HEADER");
+            $copy_sql = "COPY $table ($col_list) FROM STDIN WITH (FORMAT CSV)";
+            $start_ok = eval { $db->do($copy_sql); 1 };
+            if (!$start_ok) {
+                log_debug("COPY failed entirely: $@ — falling back to bulk INSERT");
+                goto FALLBACK_INSERT_PG;
+            }
+            $use_header_as_data = 1;
+        } else {
+            $use_header_as_data = 0;
+        }
+
+        log_debug("Using CSV COPY for PostgreSQL");
+
+        my $csv = Text::CSV->new({
+            binary         => 1,
+            quote_char     => '"',
+            escape_char    => '"',
+            sep_char       => ',',
+            eol            => "\n",
+            always_quote   => 1,
+        }) or do {
+            my $err = "Cannot create Text::CSV: " . Text::CSV->error_diag();
+            log_error($err);
+            eval { $db->pg_putcopyend(); };
+            $db->disconnect();
+            return 0;
+        };
+
+        my $success = eval {
+            if ($use_header_as_data) {
+                $csv->combine(@columns);
+                $db->pg_putcopydata($csv->string);
+            }
+            for my $row (@$data_rows) {
+                next unless $row && ref($row) eq 'ARRAY' && @$row == @columns;
+                my @vals = map { defined($_) ? $_ : undef } @$row;
+                $csv->combine(@vals);
+                $db->pg_putcopydata($csv->string);
+            }
+            $db->pg_putcopyend();
+            1;
+        };
+
+        if ($success) {
+            $db->disconnect();
+            return 1;
+        } else {
+            my $err = "CSV COPY failed: $@";
+            log_error($err);
+            eval { $db->pg_putcopyend(); };
+            goto FALLBACK_INSERT_PG;
+        }
+
+        # ========================
+        # Fallback для PostgreSQL
+        # ========================
+        FALLBACK_INSERT_PG:
+        {
+            my $quoted_cols = join(', ', map { $db->quote_identifier($_) } @columns);
+            my $placeholders = join(',', ('?') x @columns);
+            my $sql = "INSERT INTO $table ($quoted_cols) VALUES ($placeholders)";
+            my $sth = $db->prepare($sql);
+
+            my $success = eval {
+                for my $row (@$data_rows) {
+                    next unless $row && ref($row) eq 'ARRAY' && @$row == @columns;
+                    my @vals = map { defined($_) ? $_ : undef } @$row;
+                    $sth->execute(@vals);
+                }
+                1;
+            };
+
+            if (!$success) {
+                my $err = "PostgreSQL bulk INSERT failed: $@";
+                log_error($err);
+                $db->disconnect();
+                return 0;
             }
-            $line .= "\n";
-            $db->pg_put_copy_data($line);
         }
-        $db->pg_put_copy_end();  # Завершаем копирование
+
+    } else {
+        my $err = "Unsupported DBTYPE: '$config_ref{DBTYPE}'";
+        log_error($err);
+        $db->disconnect();
+        return 0;
     }
+
     $db->disconnect();
+    return 1;
 }
-
 #---------------------------------------------------------------------------------------------------------------
 
 sub reconnect_db {
@@ -1110,5 +1274,8 @@ if ($MY_NAME!~/upgrade.pl/) {
     Set_Variable($dbh);
     }
 
+#warn "DBI driver name: ", $dbh->{Driver}->{Name}, "\n";
+#warn "Full dbh class: ", ref($dbh), "\n";
+
 1;
 }