فهرست منبع

add parallel analyze netflow data
add parallel programming routers

rajven 4 سال پیش
والد
کامیت
e7d0e7f246
3فایلهای تغییر یافته به همراه209 افزوده شده و 149 حذف شده
  1. 1 1
      html/admin/devices/index.php
  2. 196 147
      scripts/parse_flow.pl
  3. 12 1
      scripts/sync_mikrotik.pl

+ 1 - 1
html/admin/devices/index.php

@@ -52,7 +52,7 @@ if (!empty($sort_field) and !empty($order)) { $sort_sql = " ORDER BY $sort_field
 <td class="info" colspan=2 > <?php  print_devtypes_select($db_link, "devtypes", $f_devtype_id); ?>
 <td class="info" >Показать оборудование из</td>
 <td class="info" > <?php  print_building_select($db_link, "building_id", $f_building_id); ?></td>
-<td class="info" colspan=3> <input type="submit" onclick="return confirm('Применить?')" name="apply" value="Apply"></td>
+<td class="info" colspan=3> <input type="submit" name="apply" value="Показать"></td>
 </tr>
 <tr align="center">
 <td><input type="checkbox" onClick="checkAll(this.checked);"></td>

+ 196 - 147
scripts/parse_flow.pl

@@ -4,6 +4,8 @@
 # Copyright (C) Roman Dmitiriev, rnd@rajven.ru
 #
 
+use English;
+use base;
 use FindBin '$Bin';
 use lib "$Bin/";
 use strict;
@@ -17,10 +19,10 @@ use Rstat::config;
 use Rstat::main;
 use Rstat::net_utils;
 use Rstat::mysql;
+use Parallel::ForkManager;
 
 setpriority(0,0,19);
 
-
 my $router_id;
 if (scalar @ARGV>1) { $router_id=shift(@ARGV); } else { $router_id=$ARGV[0]; }
 
@@ -29,13 +31,14 @@ if (!$router_id) {
     exit 110;
     }
 
+my $fork_count = $cpu_count*10;
+
 my $timeshift = get_option($dbh,55)*60;
 
 db_log_debug($dbh,"Import traffic from router id: $router_id start. Timestep $timeshift sec.") if ($debug);
 
 my %stats;
 $stats{pkt}{all}=0;
-$stats{pkt}{user}=0;
 $stats{pkt}{user_in}=0;
 $stats{pkt}{user_out}=0;
 $stats{pkt}{free}=0;
@@ -52,219 +55,265 @@ my $users = new Net::Patricia;
 InitSubnets();
 
 #get userid list
-my $user_auth_list = $dbh->prepare( "SELECT id,ip,user_id,save_traf FROM User_auth where deleted=0 ORDER by user_id,ip" );
-if ( !defined $user_auth_list ) { die "Cannot prepare statement: $DBI::errstr\n"; }
-
-$user_auth_list->execute;
-
-# user auth list
-my $authlist_ref = $user_auth_list->fetchall_arrayref();
-$user_auth_list->finish();
+my @auth_list_ref = get_records_sql($dbh,"SELECT id,ip,user_id,save_traf FROM User_auth where deleted=0 ORDER by user_id");
 
 my %user_stats;
 
-foreach my $row (@$authlist_ref) {
-$users->add_string($row->[1],$row->[0]);
-$user_stats{$row->[0]}{net}=$row->[1];
-$user_stats{$row->[0]}{id}=$row->[0];
-$user_stats{$row->[0]}{user_id}=$row->[2];
-$user_stats{$row->[0]}{save_traf}=$row->[3];
-$user_stats{$row->[0]}{in}=0;
-$user_stats{$row->[0]}{out}=0;
-$user_stats{$row->[0]}{pkt_in}=0;
-$user_stats{$row->[0]}{pkt_out}=0;
+foreach my $row (@auth_list_ref) {
+$users->add_string($row->{ip},$row->{id});
+$user_stats{$row->{ip}}{net}=$row->{ip};
+$user_stats{$row->{ip}}{auth_id}=$row->{id};
+$user_stats{$row->{ip}}{user_id}=$row->{user_id};
+$user_stats{$row->{ip}}{save_traf}=$row->{save_traf};
+$user_stats{$row->{ip}}{in}=0;
+$user_stats{$row->{ip}}{out}=0;
+$user_stats{$row->{ip}}{pkt_in}=0;
+$user_stats{$row->{ip}}{pkt_out}=0;
 }
 
-my $last_time = localtime();
+my $start_time = localtime();
 
-my $time_string;
-my $dbtime;
 my $hour_date;
 my $minute_date;
 
 my @batch_sql_traf=();
 
-open(FH,"-");
+my $pm = Parallel::ForkManager->new($fork_count);
+
+$pm->run_on_finish(
+sub {
+my ($pid, $exit, $ident, $signal, $core, $data) = @_; 
+if ($data) {
+    my $dataref = ${$data};
+    foreach my $user_ip (keys $dataref->{stats}) {
+        $user_stats{$user_ip}{in} += $dataref->{stats}{$user_ip}{in};
+        $user_stats{$user_ip}{pkt_in} +=$dataref->{stats}{$user_ip}{pkt_in};
+        $user_stats{$user_ip}{out} += $dataref->{stats}{$user_ip}{out};
+        $user_stats{$user_ip}{pkt_out} +=$dataref->{stats}{$user_ip}{pkt_out};
+        }
+    $stats{pkt}{all}+=$dataref->{pkt}{all};
+    $stats{pkt}{user_in}+=$dataref->{pkt}{user_in};
+    $stats{pkt}{user_out}+=$dataref->{pkt}{user_out};
+    $stats{pkt}{free}+=$dataref->{pkt}{free};
+    $stats{pkt}{unknown}+=$dataref->{pkt}{unknown};
+    $stats{line}{all}+=$dataref->{line}{all};
+    $stats{line}{user}+=$dataref->{line}{user};
+    $stats{line}{free}+=$dataref->{line}{free};
+    $stats{line}{unknown}+=$dataref->{line}{unknown};
+    if (scalar(@{$dataref->{sql}})) {
+        $dbh=init_db();
+        $dbh->{AutoCommit} = 0;
+        my $sth;
+        foreach my $sSQL(@{$dataref->{sql}}) {
+            $sth = $dbh->prepare($sSQL);
+            $sth->execute;
+            }
+        $sth->finish;
+        $dbh->{AutoCommit} = 1;
+        }
+    }
+}
+);
+
+my @input_buf=();
+my $line_count = 0;
+my $first_step = 0;
 
-while (my $line=<FH>) {
-$stats{line}{all}++;
-#1555573194.980;17   ;     77.243.0.12;   172.20.178.71;    53; 43432;       1;     134;     2;     1
+while (my $line = <STDIN>) {
+chomp($line);
 $line=~s/\s+//g;
+if (!$first_step) {
+    my ($l_time,$l_proto,$l_src_ip,$l_dst_ip,$l_src_port,$l_dst_port,$l_packets,$l_bytes,$l_in_dev,$l_out_dev) = split(/;/,$line);
+    $start_time = $l_time;
+    $first_step = 1;
+    }
+$line_count++;
+push(@input_buf,$line);
+if ($line_count < 5000) { next; }
+$line_count = 0;
+my @tmp = @input_buf;
+undef @input_buf;
+$pm->start and next;
+my $ret = calc_stats(\@tmp);
+$pm->finish(0, \$ret);
+}
+
+if (scalar(@input_buf)) {
+    $pm->start;
+    my $ret = calc_stats(\@input_buf);
+    $pm->finish(0, \$ret);
+    }
+
+$pm->wait_all_children;
+
+undef(@input_buf);
+
+sub calc_stats {
+
+my $lines = shift;
+
+my $lines_stats;
+
+$lines_stats->{pkt}{all}=0;
+$lines_stats->{pkt}{user_in}=0;
+$lines_stats->{pkt}{user_out}=0;
+$lines_stats->{pkt}{free}=0;
+$lines_stats->{pkt}{unknown}=0;
+$lines_stats->{line}{all}=0;
+$lines_stats->{line}{user}=0;
+$lines_stats->{line}{free}=0;
+$lines_stats->{line}{unknown}=0;
+
+foreach my $line (@$lines) {
 my ($l_time,$l_proto,$l_src_ip,$l_dst_ip,$l_src_port,$l_dst_port,$l_packets,$l_bytes,$l_in_dev,$l_out_dev) = split(/;/,$line);
-$stats{pkt}{all}+=$l_packets;
 
-if (!$l_time) { $stats{line}{illegal}++; $stats{pkt}{illegal}+=$l_packets; next; }
-if ($l_src_ip eq '0.0.0.0') { $stats{line}{illegal}++; $stats{pkt}{illegal}+=$l_packets; next; }
-if ($l_dst_ip eq '0.0.0.0') { $stats{line}{illegal}++; $stats{pkt}{illegal}+=$l_packets; next; }
-if ($l_src_ip eq '255.255.255.255') { $stats{line}{illegal}++; $stats{pkt}{illegal}+=$l_packets; next; }
-if ($l_dst_ip eq '255.255.255.255') { $stats{line}{illegal}++; $stats{pkt}{illegal}+=$l_packets; next; }
-if ($Special_Nets->match_string($l_src_ip) or $Special_Nets->match_string($l_dst_ip)) { $stats{line}{illegal}++; $stats{pkt}{illegal}+=$l_packets; next; }
+$lines_stats->{pkt}{all}+=$l_packets;
+$lines_stats->{line}{all}++;
 
+if (!$l_time) { $lines_stats->{line}{illegal}++; $lines_stats->{pkt}{illegal}+=$l_packets; next; }
+if ($l_src_ip eq '0.0.0.0') { $lines_stats->{line}{illegal}++; $lines_stats->{pkt}{illegal}+=$l_packets; next; }
+if ($l_dst_ip eq '0.0.0.0') { $lines_stats->{line}{illegal}++; $lines_stats->{pkt}{illegal}+=$l_packets; next; }
+if ($l_src_ip eq '255.255.255.255') { $lines_stats->{line}{illegal}++; $lines_stats->{pkt}{illegal}+=$l_packets; next; }
+if ($l_dst_ip eq '255.255.255.255') { $lines_stats->{line}{illegal}++; $lines_stats->{pkt}{illegal}+=$l_packets; next; }
+#special networks
+if ($Special_Nets->match_string($l_src_ip) or $Special_Nets->match_string($l_dst_ip)) { $lines_stats->{line}{illegal}++; $lines_stats->{pkt}{illegal}+=$l_packets; next; }
 #unknown networks
-if (!$office_networks->match_string($l_src_ip) and !$office_networks->match_string($l_dst_ip)) { $stats{line}{illegal}++; $stats{pkt}{illegal}+=$l_packets; next; }
-
+if (!$office_networks->match_string($l_src_ip) and !$office_networks->match_string($l_dst_ip)) { $lines_stats->{line}{illegal}++; $lines_stats->{pkt}{illegal}+=$l_packets; next; }
 #local forward
-if ($office_networks->match_string($l_src_ip) and $office_networks->match_string($l_dst_ip)) { $stats{line}{free}++; $stats{line}{free}+=$l_packets; next; }
-
+if ($office_networks->match_string($l_src_ip) and $office_networks->match_string($l_dst_ip)) { $lines_stats->{line}{free}++; $lines_stats->{line}{free}+=$l_packets; next; }
 #free forward
-if ($office_networks->match_string($l_src_ip) and $free_networks->match_string($l_dst_ip)) { $stats{line}{free}++; $stats{line}{free}+=$l_packets; next; }
-if ($free_networks->match_string($l_src_ip) and $office_networks->match_string($l_dst_ip)) { $stats{line}{free}++; $stats{line}{free}+=$l_packets; next; }
+if ($office_networks->match_string($l_src_ip) and $free_networks->match_string($l_dst_ip)) { $lines_stats->{line}{free}++; $lines_stats->{line}{free}+=$l_packets; next; }
+if ($free_networks->match_string($l_src_ip) and $office_networks->match_string($l_dst_ip)) { $lines_stats->{line}{free}++; $lines_stats->{line}{free}+=$l_packets; next; }
 
 my $l_src_ip_aton=StrToIp($l_src_ip);
 my $l_dst_ip_aton=StrToIp($l_dst_ip);
 
-$last_time = $l_time;
 my ($sec,$min,$hour,$day,$month,$year,$zone) = (localtime($l_time))[0,1,2,3,4,5];
 $month++;
 $year += 1900;
+my $full_time = $dbh->quote(sprintf "%04d-%02d-%02d %02d:%02d:%02d",$year,$month,$day,$hour,$min,$sec);
 
-$time_string = sprintf "%04d-%02d-%02d %02d:%02d:%02d",$year,$month,$day,$hour,$min,$sec;
-$dbtime = $dbh->quote($time_string);
-$hour_date = $dbh->quote(sprintf "%04d-%02d-%02d %02d:00:00",$year,$month,$day,$hour);
-$minute_date = $dbh->quote(sprintf "%04d-%02d-%02d %02d:%02d:00",$year,$month,$day,$hour,$min);
+my $user_ip;
+my $auth_id;
 
-my $user_found = 0;
 # find user id
-
 if ($office_networks->match_string($l_src_ip)) {
-    my $out_user = $users->match_string($l_src_ip);
-    if ($out_user) {
-        $user_stats{$out_user}{out} += $l_bytes;
-        $user_stats{$out_user}{dbtime} = $minute_date;
-        $user_stats{$out_user}{htime} = $hour_date;
-        $user_stats{$out_user}{pkt_out} +=$l_packets;
-        $user_found = 1;
-        $stats{line}{user}++;
-        $stats{pkt}{user_out}+=$l_packets;
-        if ($save_detail and $user_stats{$out_user}{save_traf}) {
-            my $dSQL="INSERT INTO Traffic_detail (auth_id,router_id,timestamp,proto,src_ip,dst_ip,src_port,dst_port,bytes,pkt) VALUES($out_user,$router_id,$dbtime,'$l_proto',$l_src_ip_aton,$l_dst_ip_aton,'$l_src_port','$l_dst_port','$l_bytes','$l_packets')";
-            push (@batch_sql_traf,$dSQL);
-            }
-        }
+    $user_ip = $l_src_ip;
+    $lines_stats->{stats}{$user_ip}{ip}=$user_ip;
+    if (!$lines_stats->{stats}{$user_ip}{out}) { $lines_stats->{stats}{$user_ip}{out}=0; }
+    if (!$lines_stats->{stats}{$user_ip}{pkt_out}) { $lines_stats->{stats}{$user_ip}{pkt_out}=0; }
+    $lines_stats->{stats}{$user_ip}{out} += $l_bytes;
+    $lines_stats->{stats}{$user_ip}{pkt_out} +=$l_packets;
+    $lines_stats->{line}{user}++;
+    $lines_stats->{pkt}{user_out}+=$l_packets;
     }
-
 if ($office_networks->match_string($l_dst_ip)) {
-    my $in_user = $users->match_string($l_dst_ip);
-    if ($in_user) {
-        $user_stats{$in_user}{in} += $l_bytes;
-        $user_stats{$in_user}{dbtime} = $minute_date;
-        $user_stats{$in_user}{htime} = $hour_date;
-        $user_stats{$in_user}{pkt_in} +=$l_packets;
-        $stats{line}{user}++;
-        $stats{pkt}{user_in}+=$l_packets;
-        $user_found = 1;
-        if ($save_detail and $user_stats{$in_user}{save_traf}) {
-            my $dSQL="INSERT INTO Traffic_detail (auth_id,router_id,timestamp,proto,src_ip,dst_ip,src_port,dst_port,bytes,pkt) VALUES($in_user,$router_id,$dbtime,'$l_proto',$l_src_ip_aton,$l_dst_ip_aton,'$l_src_port','$l_dst_port','$l_bytes','$l_packets')";
-            push (@batch_sql_traf,$dSQL);
-            }
-        }
+    $user_ip = $l_dst_ip;
+    $lines_stats->{stats}{$user_ip}{ip}=$l_dst_ip;
+    if (!$lines_stats->{stats}{$user_ip}{in}) { $lines_stats->{stats}{$user_ip}{in}=0; }
+    if (!$lines_stats->{stats}{$user_ip}{pkt_in}) { $lines_stats->{stats}{$user_ip}{pkt_in}=0; }
+    $lines_stats->{stats}{$user_ip}{in} += $l_bytes;
+    $lines_stats->{stats}{$user_ip}{pkt_in} +=$l_packets;
+    $lines_stats->{line}{user}++;
+    $lines_stats->{pkt}{user_in}+=$l_packets;
     }
 
-if (scalar(@batch_sql_traf)>100000) {
-    $dbh->{AutoCommit} = 0;
-    my $f_sth;
-    foreach my $sSQL(@batch_sql_traf) {
-        $f_sth = $dbh->prepare($sSQL);
-        $f_sth->execute;
+my $auth_id;
+
+#save full packet
+if ($save_detail)  {
+    if (($user_ip and $user_stats{$user_ip}{save_traf}) or (!$auth_id and $config_ref{save_detail})) {
+        if ($user_ip) { $auth_id = $users->match_string($user_ip); }
+        if (!$auth_id) { $auth_id = 0; }
+        push(@{$lines_stats->{sql}},"INSERT INTO Traffic_detail (auth_id,router_id,timestamp,proto,src_ip,dst_ip,src_port,dst_port,bytes,pkt) VALUES($auth_id,$router_id,$full_time,'$l_proto',$l_src_ip_aton,$l_dst_ip_aton,'$l_src_port','$l_dst_port','$l_bytes','$l_packets')");
         }
-    $f_sth->finish;
-    $dbh->{AutoCommit} = 1;
-    @batch_sql_traf=();
     }
 
-if ($users->match_string($l_src_ip) or $users->match_string($l_dst_ip)) { next; }
-if (!$add_unknown_user) { $stats{line}{illegal}++; $stats{pkt}{illegal}+=$l_packets; next; }
+if ($auth_id) { next; }
 
-# find user ip
-my $user_ip;
-my $user_ip_aton;
-undef $user_ip;
+if (!$config_ref{add_unknown_user}) { $lines_stats->{line}{illegal}++; $lines_stats->{pkt}{illegal}+=$l_packets; next; }
+
+if (exists $lines_stats->{$l_src_ip}) { $user_ip = $l_src_ip; }
+if (exists $lines_stats->{$l_dst_ip}) { $user_ip = $l_dst_ip; }
 
 #add user by src ip only if dst not office network!!!!
 if (!$office_networks->match_string($l_dst_ip) and $office_networks->match_string($l_src_ip)) { $user_ip = $l_src_ip; }
 
-#skip unknown packet
-if (!$user_ip) { $stats{line}{illegal}++; $stats{pkt}{illegal}+=$l_packets; next; }
-
-$stats{line}{user}++;
-
-$user_ip_aton=StrToIp($user_ip);
-
-#new user
-my $auth_id=new_auth($dbh,$user_ip);
-next if (!$auth_id);
-
-db_log_warning($dbh,"New ip $user_ip added by netflow!");
+if (!$user_ip) { $lines_stats->{line}{illegal}++; $lines_stats->{pkt}{illegal}+=$l_packets; next; }
+
+if ($user_ip eq $l_src_ip) {
+    $lines_stats->{stats}{$user_ip}{ip}=$user_ip;
+    $lines_stats->{stats}{$user_ip}{auth_id}= 0;
+    if (!$lines_stats->{stats}{$user_ip}{out}) { $lines_stats->{stats}{$user_ip}{out}=0; }
+    if (!$lines_stats->{stats}{$user_ip}{pkt_out}) { $lines_stats->{stats}{$user_ip}{pkt_out}=0; }
+    $lines_stats->{stats}{$user_ip}{out} += $l_bytes;
+    $lines_stats->{stats}{$user_ip}{pkt_out} +=$l_packets;
+    $lines_stats->{line}{user}++;
+    $lines_stats->{pkt}{user_out}+=$l_packets;
+    }
 
-my $new_user = get_record_sql($dbh,"SELECT * FROM User_auth WHERE id=$auth_id");
+if ($user_ip eq $l_dst_ip) {
+    $lines_stats->{stats}{$user_ip}{ip}=$l_dst_ip;
+    $lines_stats->{stats}{$user_ip}{auth_id}=0;
+    if (!$lines_stats->{stats}{$user_ip}{in}) { $lines_stats->{stats}{$user_ip}{in}=0; }
+    if (!$lines_stats->{stats}{$user_ip}{pkt_in}) { $lines_stats->{stats}{$user_ip}{pkt_in}=0; }
+    $lines_stats->{stats}{$user_ip}{in} += $l_bytes;
+    $lines_stats->{stats}{$user_ip}{pkt_in} +=$l_packets;
+    $lines_stats->{line}{user}++;
+    $lines_stats->{pkt}{user_in}+=$l_packets;
+    }
+}
 
-$users->add_string($user_ip,$auth_id);
-$user_stats{$auth_id}{net}=$user_ip;
-$user_stats{$auth_id}{user_id}=$new_user->{user_id};
-$user_stats{$auth_id}{id}=$auth_id;
-$user_stats{$auth_id}{in}=0;
-$user_stats{$auth_id}{out}=0;
-$user_stats{$auth_id}{pkt_in}=0;
-$user_stats{$auth_id}{pkt_out}=0;
+return $lines_stats;
+}
 
-db_log_info($dbh,"Added user_auth id: $auth_id ip: $user_ip user_id: $new_user->{user_id}");
+$dbh=init_db();
 
-if ($auth_id) {
-        if ($save_detail) {
-            my $dSQL="INSERT INTO Traffic_detail (auth_id,router_id,timestamp,proto,src_ip,dst_ip,src_port,dst_port,bytes) VALUES($auth_id,$router_id,$dbtime,'$l_proto',$l_src_ip_aton,$l_dst_ip_aton,'$l_src_port','$l_dst_port','$l_bytes')";
-            push (@batch_sql_traf,$dSQL);
-            }
-        if ($l_src_ip eq $user_ip) {
-            $user_stats{$auth_id}{out} += $l_bytes;
-            $user_stats{$auth_id}{pkt_out} += $l_bytes;
-            }
-        if ($l_dst_ip eq $user_ip) {
-            $user_stats{$auth_id}{in} += $l_bytes;
-            $user_stats{$auth_id}{pkt_in} += $l_bytes;
-            }
-        $user_stats{$auth_id}{dbtime} = $minute_date;
-        $user_stats{$auth_id}{htime} = $hour_date;
-        } else {
-        undef $user_ip;
-        undef $user_ip_aton;
-        }
-}
+####################################################################################################
 
 #start hour
-my ($min,$hour,$day,$month,$year) = (localtime($last_time))[1,2,3,4,5];
-my $hour_date1 = $dbh->quote(sprintf "%04d-%02d-%02d %02d:00:00",$year+1900,$month+1,$day,$hour);
-my $flow_date = $dbh->quote(sprintf "%04d-%02d-%02d %02d:%02d:00",$year+1900,$month+1,$day,$hour,$min);
-
+my ($min,$hour,$day,$month,$year) = (localtime($start_time))[1,2,3,4,5];
+my $hour_date1 = $dbh->quote(sprintf "%04d-%02d-%02d %02d:00:00",$year+1900,$month+1,$day,$hour,0,0);
 #end hour
-($min,$hour,$day,$month,$year) = (localtime($last_time+3600))[1,2,3,4,5];
-my $hour_date2 = $dbh->quote(sprintf "%04d-%02d-%02d %02d:00:00",$year+1900,$month+1,$day,$hour);
+my $hour_date2 = $dbh->quote(sprintf "%04d-%02d-%02d %02d:00:00",$year+1900,$month+1,$day,$hour,59,59);
+#flow time
+my $flow_date = $dbh->quote(sprintf "%04d-%02d-%02d %02d:%02d:00",$year+1900,$month+1,$day,$hour,$min,1);
 
 # update database
-foreach my $row (keys %user_stats) {
-next if (!$user_stats{$row}{htime});
+foreach my $user_ip (keys %user_stats) {
+
+my $user_ip_aton=StrToIp($user_ip);
+my $auth_id = $user_stats{$user_ip}{auth_id};
+
+if (!$auth_id) {
+    $auth_id=new_auth($dbh,$user_ip);
+    $user_stats{$user_ip}{auth_id}=$auth_id;
+    #fix traffic detail for new users
+    push(@batch_sql_traf,"UPDATE Traffic_detail set auth_id=$auth_id WHERE auth_id=0 AND `timestamp`>='$hour_date1' AND `timestamp`<='$hour_date2' AND (src_ip=$user_ip_aton OR dst_ip=$user_ip_aton)");
+    }
 
 #current stats
-my $tSQL="INSERT INTO User_stats_full (timestamp,auth_id,router_id,byte_in,byte_out,pkt_in,pkt_out,step) VALUES($flow_date,'$user_stats{$row}{id}','$router_id','$user_stats{$row}{in}','$user_stats{$row}{out}','$user_stats{$row}{pkt_in}','$user_stats{$row}{pkt_out}','$timeshift')";
+my $tSQL="INSERT INTO User_stats_full (timestamp,auth_id,router_id,byte_in,byte_out,pkt_in,pkt_out,step) VALUES($flow_date,'$user_stats{$user_ip}{auth_id}','$router_id','$user_stats{$user_ip}{in}','$user_stats{$user_ip}{out}','$user_stats{$user_ip}{pkt_in}','$user_stats{$user_ip}{pkt_out}','$timeshift')";
 push (@batch_sql_traf,$tSQL);
 
 #last found timestamp
-$tSQL="UPDATE User_auth SET `last_found`=$flow_date WHERE id='$user_stats{$row}{id}'";
+$tSQL="UPDATE User_auth SET `last_found`=$flow_date WHERE id='$user_stats{$user_ip}{auth_id}'";
 push (@batch_sql_traf,$tSQL);
 
 #hour stats
 # get current stats
 my $sql = "SELECT id, byte_in, byte_out FROM User_stats
-WHERE `timestamp`>=$hour_date1 AND `timestamp`<$hour_date2 AND router_id=$router_id AND auth_id=$user_stats{$row}{id}";
+WHERE `timestamp`>=$hour_date1 AND `timestamp`<=$hour_date2 AND router_id=$router_id AND auth_id=$user_stats{$user_ip}{auth_id}";
 my $hour_stat = get_record_sql($dbh,$sql);
 if (!$hour_stat) {
-    my $dSQL="INSERT INTO User_stats (timestamp,auth_id,router_id,byte_in,byte_out) VALUES($user_stats{$row}{htime},'$user_stats{$row}{id}','$router_id','$user_stats{$row}{in}','$user_stats{$row}{out}')";
+    my $dSQL="INSERT INTO User_stats (timestamp,auth_id,router_id,byte_in,byte_out) VALUES($flow_date,'$user_stats{$user_ip}{auth_id}','$router_id','$user_stats{$user_ip}{in}','$user_stats{$user_ip}{out}')";
     push (@batch_sql_traf,$dSQL);
     next;
     }
 if (!$hour_stat->{byte_in}) { $hour_stat->{byte_in}=0; }
 if (!$hour_stat->{byte_out}) { $hour_stat->{byte_out}=0; }
-$hour_stat->{byte_in} += $user_stats{$row}{in};
-$hour_stat->{byte_out} += $user_stats{$row}{out};
+$hour_stat->{byte_in} += $user_stats{$user_ip}{in};
+$hour_stat->{byte_out} += $user_stats{$user_ip}{out};
 $tSQL="UPDATE User_stats SET byte_in='".$hour_stat->{byte_in}."', byte_out='".$hour_stat->{byte_out}."' WHERE id=".$hour_stat->{id};
 push (@batch_sql_traf,$tSQL);
 }

+ 12 - 1
scripts/sync_mikrotik.pl

@@ -21,6 +21,7 @@ use DBI;
 use utf8;
 use open ":encoding(utf8)";
 use Fcntl qw(:flock);
+use Parallel::ForkManager;
 
 #$debug = 1;
 
@@ -31,6 +32,8 @@ $|=1;
 
 if (IsNotRun($SPID)) { Add_PID($SPID); }  else { die "Warning!!! $SPID already runnning!\n"; }
 
+my $fork_count = $cpu_count*10;
+
 my @gateways =();
 #select undeleted mikrotik routers only
 if ($ARGV[0]) {
@@ -59,8 +62,14 @@ $dhcp_conf{$subnet_name}->{first_ip_aton}=StrToIp($dhcp_info->{first_ip});
 $dhcp_conf{$subnet_name}->{last_ip_aton}=StrToIp($dhcp_info->{last_ip});
 }
 
+my $pm = Parallel::ForkManager->new($fork_count);
+
 foreach my $gate (@gateways) {
 next if (!$gate);
+
+$pm->start and next;
+$dbh = init_db();
+
 my $router_name=$gate->{device_name};
 my $router_ip=$gate->{ip};
 my $shaper_enabled = $gate->{queue_enabled};
@@ -726,9 +735,11 @@ if (scalar(@cmd_list)) {
     }
 
 db_log_verbose($dbh,"Sync user state at router $router_name [".$router_ip."] stopped.");
+$dbh->disconnect();
+$pm->finish;
 }
 
-$dbh->disconnect();
+$pm->wait_all_children;
 
 if (IsMyPID($SPID)) { Remove_PID($SPID); };