Przeglądaj źródła

bugfix: change mysql database charset in php to utf8mb4
speedup netflow parser

rajven 4 lat temu
rodzic
commit
f07b21d307
3 zmienionych plików z 62 dodań i 23 usunięć
  1. 3 4
      html/inc/sql.php
  2. 29 1
      scripts/Rstat/mysql.pm
  3. 30 18
      scripts/parse_flow.pl

+ 3 - 4
html/inc/sql.php

@@ -8,13 +8,12 @@ if (! $db_link) {
     echo "Код ошибки errno: " . mysqli_connect_errno() . PHP_EOL;
     echo "Текст ошибки error: " . mysqli_connect_error() . PHP_EOL;
     exit();
-}
+    }
 
 /* изменение набора символов на utf8 */
-mysqli_query($db_link, "SET NAMES utf8");
-if (! mysqli_set_charset($db_link, "utf8")) {
+if (!mysqli_set_charset($db_link,'utf8mb4')) {
     printf("Ошибка при загрузке набора символов utf8: %s\n", mysqli_error($db_link));
     exit();
-}
+    }
 
 ?>

+ 29 - 1
scripts/Rstat/mysql.pm

@@ -23,6 +23,7 @@ our @ISA = qw(Exporter);
 
 our @EXPORT = qw(
 batch_db_sql
+batch_db_sql_cached
 db_log_warning
 db_log_debug
 db_log_error
@@ -121,7 +122,6 @@ my $sth;
 if (ref($batch_sql) eq 'ARRAY') {
     foreach my $sSQL (@$batch_sql) {
         next if (!$sSQL);
-        print "$sSQL\n";
         $sth = $db->prepare($sSQL);
         $sth->execute;
         $apply = 1;
@@ -141,6 +141,34 @@ $db->{AutoCommit} = 1;
 
 #---------------------------------------------------------------------------------------------------------------
 
+sub batch_db_sql_cached {
+
+my $db = DBI->connect("dbi:mysql:database=$DBNAME;host=$DBHOST","$DBUSER","$DBPASS", { RaiseError => 1, AutoCommit => 0 });
+if ( !defined $db ) { die "Cannot connect to mySQL server: $DBI::errstr\n"; }
+$db->do('SET NAMES utf8');
+$db->{mysql_auto_reconnect} = 1;
+
+my $table= shift;
+my $batch_sql=shift;
+
+return if (!$db);
+if (ref($batch_sql) eq 'ARRAY') {
+    my $sth = $dbh->prepare_cached($table);
+    db_log_debug($db,"Start prepare data");
+    foreach my $sSQL (@$batch_sql) {
+        next if (!$sSQL);
+        $sth->execute(@$sSQL);
+        }
+    db_log_debug($db,"End prepare data");
+    }
+db_log_debug($db,"Start commit");
+$db->commit();
+db_log_debug($db,"End commit");
+$db->disconnect();
+}
+
+#---------------------------------------------------------------------------------------------------------------
+
 sub do_sql {
 my $db=shift;
 my $sql=shift;

+ 30 - 18
scripts/parse_flow.pl

@@ -84,6 +84,7 @@ my ($pid, $exit, $ident, $signal, $core, $data) = @_;
 if ($data) {
     my $dataref = ${$data};
     my $f_dbh=init_db();
+    db_log_debug($f_dbh,"Run get data from child") if ($debug);
     foreach my $user_ip (keys %{$dataref->{stats}}) {
         $user_stats{$user_ip}{in} += $dataref->{stats}{$user_ip}{in};
         $user_stats{$user_ip}{pkt_in} +=$dataref->{stats}{$user_ip}{pkt_in};
@@ -100,7 +101,7 @@ if ($data) {
     $stats{line}{free}+=$dataref->{line}{free};
     $stats{line}{unknown}+=$dataref->{line}{unknown};
     $last_time = $dataref->{last_time};
-    if (scalar(@{$dataref->{sql}})) { batch_db_sql($f_dbh,\@{$dataref->{sql}}); }
+    db_log_debug($f_dbh,"Get data from child stopped") if ($debug);
     $f_dbh->disconnect;
     }
 }
@@ -113,24 +114,24 @@ my $line_count = 0;
 
 my $child_count = 0;
 
-while (my $line = <STDIN>) {
-chomp($line);
-$line=~s/\s+//g;
+while (my $raw_line = <STDIN>) {
+chomp($raw_line);
+$raw_line=~s/\s+//g;
 $line_count++;
-push(@input_buf,$line);
-if ($line_count < 50000) { next; }
+push(@input_buf,$raw_line);
+if ($line_count < 50000 and $raw_line =~ /\S/) { next; }
 $line_count = 0;
 $child_count ++;
-my @tmp = @input_buf;
+my @tmp = ();
+push (@tmp,@input_buf);
 undef @input_buf;
 $pm->start and next;
-db_log_debug($dbh,"Started child $child_count") if ($debug);
 my $ret = calc_stats(\@tmp);
 $pm->finish(0, \$ret);
-db_log_debug($dbh,"Stopped child $child_count") if ($debug);
 }
 
 if (scalar(@input_buf)) {
+    $child_count ++;
     $pm->start;
     my $ret = calc_stats(\@input_buf);
     $pm->finish(0, \$ret);
@@ -144,7 +145,12 @@ sub calc_stats {
 
 my $lines = shift;
 
+return if (!$lines or !scalar @$lines);
+
 my $f_dbh = init_db();
+
+db_log_debug($f_dbh,"Started child $child_count for ".scalar @$lines." lines count") if ($debug);
+
 my $lines_stats;
 
 $lines_stats->{pkt}{all}=0;
@@ -185,7 +191,8 @@ my $l_dst_ip_aton=StrToIp($l_dst_ip);
 my ($sec,$min,$hour,$day,$month,$year,$zone) = (localtime($l_time))[0,1,2,3,4,5];
 $month++;
 $year += 1900;
-my $full_time = $f_dbh->quote(sprintf "%04d-%02d-%02d %02d:%02d:%02d",$year,$month,$day,$hour,$min,$sec);
+#my $full_time = $f_dbh->quote(sprintf "%04d-%02d-%02d %02d:%02d:%02d",$year,$month,$day,$hour,$min,$sec);
+my $full_time = sprintf "%04d-%02d-%02d %02d:%02d:%02d",$year,$month,$day,$hour,$min,$sec;
 
 my $user_ip;
 my $auth_id;
@@ -213,14 +220,13 @@ if ($users->match_string($l_dst_ip)) {
     }
 
 my $auth_id;
+if ($user_ip) { $auth_id = $users->match_string($user_ip); } else { $auth_id = 0; }
 
 #save full packet
-if ($save_detail)  {
-    if (($user_ip and $user_stats{$user_ip}{save_traf}) or (!$auth_id and $config_ref{save_detail})) {
-        if ($user_ip) { $auth_id = $users->match_string($user_ip); }
-        if (!$auth_id) { $auth_id = 0; }
-        push(@{$lines_stats->{sql}},"INSERT INTO Traffic_detail (auth_id,router_id,timestamp,proto,src_ip,dst_ip,src_port,dst_port,bytes,pkt) VALUES($auth_id,$router_id,$full_time,'$l_proto',$l_src_ip_aton,$l_dst_ip_aton,'$l_src_port','$l_dst_port','$l_bytes','$l_packets')");
-        }
+if ($config_ref{save_detail})  {
+    my @detail_array = ($auth_id,$router_id,$full_time,$l_proto,$l_src_ip_aton,$l_dst_ip_aton,$l_src_port,$l_dst_port,$l_bytes,$l_packets);
+    if ($auth_id and $user_stats{$user_ip}{save_traf}) { push(@{$lines_stats->{sql}},\@detail_array); }
+    if (!$auth_id and $config_ref{add_unknown_user}) { push(@{$lines_stats->{sql}},\@detail_array); }
     }
 
 if ($auth_id) { next; }
@@ -248,8 +254,15 @@ if ($user_ip eq $l_src_ip) {
     }
 }
 
-$f_dbh->disconnect;
+db_log_debug($f_dbh,"Stopped child $child_count analyze data") if ($debug);
+
+if (scalar(@{$lines_stats->{sql}})) { 
+        db_log_debug($f_dbh,"Start write traffic detail to DB. ".scalar @{$lines_stats->{sql}}." lines count") if ($debug);
+	batch_db_sql_cached("INSERT INTO Traffic_detail (auth_id,router_id,timestamp,proto,src_ip,dst_ip,src_port,dst_port,bytes,pkt) VALUES(?,?,?,?,?,?,?,?,?,?)",\@{$lines_stats->{sql}});
+        db_log_debug($f_dbh,"Write traffic detail to DB stopped") if ($debug);
+	}
 
+$f_dbh->disconnect;
 return $lines_stats;
 }
 
@@ -312,7 +325,6 @@ $tSQL="UPDATE User_stats SET byte_in='".$hour_stat->{byte_in}."', byte_out='".$h
 push (@batch_sql_traf,$tSQL);
 }
 
-print Dumper(\@batch_sql_traf);
 batch_db_sql($m_dbh,\@batch_sql_traf);
 
 db_log_debug($m_dbh,"Import traffic from router id: $router_id stop") if ($debug);