|
|
@@ -21,8 +21,6 @@ use Rstat::net_utils;
|
|
|
use Rstat::mysql;
|
|
|
use Parallel::ForkManager;
|
|
|
|
|
|
-setpriority(0,0,19);
|
|
|
-
|
|
|
my $router_id;
|
|
|
if (scalar @ARGV>1) { $router_id=shift(@ARGV); } else { $router_id=$ARGV[0]; }
|
|
|
|
|
|
@@ -82,9 +80,10 @@ my $pm = Parallel::ForkManager->new($fork_count);
|
|
|
|
|
|
$pm->run_on_finish(
|
|
|
sub {
|
|
|
-my ($pid, $exit, $ident, $signal, $core, $data) = @_;
|
|
|
+my ($pid, $exit, $ident, $signal, $core, $data) = @_;
|
|
|
if ($data) {
|
|
|
my $dataref = ${$data};
|
|
|
+ my $f_dbh=init_db();
|
|
|
foreach my $user_ip (keys %{$dataref->{stats}}) {
|
|
|
$user_stats{$user_ip}{in} += $dataref->{stats}{$user_ip}{in};
|
|
|
$user_stats{$user_ip}{pkt_in} +=$dataref->{stats}{$user_ip}{pkt_in};
|
|
|
@@ -100,25 +99,20 @@ if ($data) {
|
|
|
$stats{line}{user}+=$dataref->{line}{user};
|
|
|
$stats{line}{free}+=$dataref->{line}{free};
|
|
|
$stats{line}{unknown}+=$dataref->{line}{unknown};
|
|
|
- if (scalar(@{$dataref->{sql}})) {
|
|
|
- $dbh=init_db();
|
|
|
- $dbh->{AutoCommit} = 0;
|
|
|
- my $sth;
|
|
|
- foreach my $sSQL(@{$dataref->{sql}}) {
|
|
|
- $sth = $dbh->prepare($sSQL);
|
|
|
- $sth->execute;
|
|
|
- }
|
|
|
- $sth->finish;
|
|
|
- $dbh->{AutoCommit} = 1;
|
|
|
- }
|
|
|
+ if (scalar(@{$dataref->{sql}})) { batch_db_sql($f_dbh,@{$dataref->{sql}}); }
|
|
|
+ $f_dbh->disconnect;
|
|
|
}
|
|
|
}
|
|
|
);
|
|
|
|
|
|
+$dbh->disconnect;
|
|
|
+
|
|
|
my @input_buf=();
|
|
|
my $line_count = 0;
|
|
|
my $first_step = 0;
|
|
|
|
|
|
+my $child_count = 0;
|
|
|
+
|
|
|
while (my $line = <STDIN>) {
|
|
|
chomp($line);
|
|
|
$line=~s/\s+//g;
|
|
|
@@ -131,11 +125,14 @@ $line_count++;
|
|
|
push(@input_buf,$line);
|
|
|
if ($line_count < 50000) { next; }
|
|
|
$line_count = 0;
|
|
|
+$child_count ++;
|
|
|
my @tmp = @input_buf;
|
|
|
undef @input_buf;
|
|
|
$pm->start and next;
|
|
|
+db_log_debug($dbh,"Started child $child_count") if ($debug);
|
|
|
my $ret = calc_stats(\@tmp);
|
|
|
$pm->finish(0, \$ret);
|
|
|
+db_log_debug($dbh,"Stopped child $child_count") if ($debug);
|
|
|
}
|
|
|
|
|
|
if (scalar(@input_buf)) {
|
|
|
@@ -152,6 +149,7 @@ sub calc_stats {
|
|
|
|
|
|
my $lines = shift;
|
|
|
|
|
|
+my $f_dbh = init_db();
|
|
|
my $lines_stats;
|
|
|
|
|
|
$lines_stats->{pkt}{all}=0;
|
|
|
@@ -191,13 +189,13 @@ my $l_dst_ip_aton=StrToIp($l_dst_ip);
|
|
|
my ($sec,$min,$hour,$day,$month,$year,$zone) = (localtime($l_time))[0,1,2,3,4,5];
|
|
|
$month++;
|
|
|
$year += 1900;
|
|
|
-my $full_time = $dbh->quote(sprintf "%04d-%02d-%02d %02d:%02d:%02d",$year,$month,$day,$hour,$min,$sec);
|
|
|
+my $full_time = $f_dbh->quote(sprintf "%04d-%02d-%02d %02d:%02d:%02d",$year,$month,$day,$hour,$min,$sec);
|
|
|
|
|
|
my $user_ip;
|
|
|
my $auth_id;
|
|
|
|
|
|
-# find user id
|
|
|
-if ($office_networks->match_string($l_src_ip)) {
|
|
|
+# find user
|
|
|
+if ($users->match_string($l_src_ip)) {
|
|
|
$user_ip = $l_src_ip;
|
|
|
$lines_stats->{stats}{$user_ip}{ip}=$user_ip;
|
|
|
if (!$lines_stats->{stats}{$user_ip}{out}) { $lines_stats->{stats}{$user_ip}{out}=0; }
|
|
|
@@ -207,7 +205,7 @@ if ($office_networks->match_string($l_src_ip)) {
|
|
|
$lines_stats->{line}{user}++;
|
|
|
$lines_stats->{pkt}{user_out}+=$l_packets;
|
|
|
}
|
|
|
-if ($office_networks->match_string($l_dst_ip)) {
|
|
|
+if ($users->match_string($l_dst_ip)) {
|
|
|
$user_ip = $l_dst_ip;
|
|
|
$lines_stats->{stats}{$user_ip}{ip}=$l_dst_ip;
|
|
|
if (!$lines_stats->{stats}{$user_ip}{in}) { $lines_stats->{stats}{$user_ip}{in}=0; }
|
|
|
@@ -233,11 +231,12 @@ if ($auth_id) { next; }
|
|
|
|
|
|
if (!$config_ref{add_unknown_user}) { $lines_stats->{line}{illegal}++; $lines_stats->{pkt}{illegal}+=$l_packets; next; }
|
|
|
|
|
|
-if (exists $lines_stats->{$l_src_ip}) { $user_ip = $l_src_ip; }
|
|
|
-if (exists $lines_stats->{$l_dst_ip}) { $user_ip = $l_dst_ip; }
|
|
|
-
|
|
|
#add user by src ip only if dst not office network!!!!
|
|
|
-if (!$office_networks->match_string($l_dst_ip) and $office_networks->match_string($l_src_ip)) { $user_ip = $l_src_ip; }
|
|
|
+#ignore dst traffic for create user
|
|
|
+if (!$office_networks->match_string($l_dst_ip) and $office_networks->match_string($l_src_ip)) {
|
|
|
+ $user_ip = $l_src_ip;
|
|
|
+ $users->add_string($user_ip,0);
|
|
|
+ }
|
|
|
|
|
|
if (!$user_ip) { $lines_stats->{line}{illegal}++; $lines_stats->{pkt}{illegal}+=$l_packets; next; }
|
|
|
|
|
|
@@ -251,33 +250,26 @@ if ($user_ip eq $l_src_ip) {
|
|
|
$lines_stats->{line}{user}++;
|
|
|
$lines_stats->{pkt}{user_out}+=$l_packets;
|
|
|
}
|
|
|
-
|
|
|
-if ($user_ip eq $l_dst_ip) {
|
|
|
- $lines_stats->{stats}{$user_ip}{ip}=$l_dst_ip;
|
|
|
- $lines_stats->{stats}{$user_ip}{auth_id}=0;
|
|
|
- if (!$lines_stats->{stats}{$user_ip}{in}) { $lines_stats->{stats}{$user_ip}{in}=0; }
|
|
|
- if (!$lines_stats->{stats}{$user_ip}{pkt_in}) { $lines_stats->{stats}{$user_ip}{pkt_in}=0; }
|
|
|
- $lines_stats->{stats}{$user_ip}{in} += $l_bytes;
|
|
|
- $lines_stats->{stats}{$user_ip}{pkt_in} +=$l_packets;
|
|
|
- $lines_stats->{line}{user}++;
|
|
|
- $lines_stats->{pkt}{user_in}+=$l_packets;
|
|
|
- }
|
|
|
}
|
|
|
|
|
|
+$f_dbh->disconnect;
|
|
|
+
|
|
|
return $lines_stats;
|
|
|
}
|
|
|
|
|
|
-$dbh=init_db();
|
|
|
+my $m_dbh=init_db();
|
|
|
|
|
|
####################################################################################################
|
|
|
|
|
|
#start hour
|
|
|
my ($min,$hour,$day,$month,$year) = (localtime($start_time))[1,2,3,4,5];
|
|
|
-my $hour_date1 = $dbh->quote(sprintf "%04d-%02d-%02d %02d:00:00",$year+1900,$month+1,$day,$hour,0,0);
|
|
|
-#end hour
|
|
|
-my $hour_date2 = $dbh->quote(sprintf "%04d-%02d-%02d %02d:00:00",$year+1900,$month+1,$day,$hour,59,59);
|
|
|
#flow time
|
|
|
-my $flow_date = $dbh->quote(sprintf "%04d-%02d-%02d %02d:%02d:00",$year+1900,$month+1,$day,$hour,$min,1);
|
|
|
+my $flow_date = $m_dbh->quote(sprintf "%04d-%02d-%02d %02d:%02d:%02d",$year+1900,$month+1,$day,$hour,$min,1);
|
|
|
+#start stat time
|
|
|
+my $hour_date1 = $m_dbh->quote(sprintf "%04d-%02d-%02d %02d:00:00",$year+1900,$month+1,$day,$hour);
|
|
|
+#end hour
|
|
|
+($hour,$day,$month,$year) = (localtime($start_time+3600))[2,3,4,5];
|
|
|
+my $hour_date2 = $m_dbh->quote(sprintf "%04d-%02d-%02d %02d:00:00",$year+1900,$month+1,$day,$hour);
|
|
|
|
|
|
# update database
|
|
|
foreach my $user_ip (keys %user_stats) {
|
|
|
@@ -286,10 +278,10 @@ my $user_ip_aton=StrToIp($user_ip);
|
|
|
my $auth_id = $user_stats{$user_ip}{auth_id};
|
|
|
|
|
|
if (!$auth_id) {
|
|
|
- $auth_id=new_auth($dbh,$user_ip);
|
|
|
+ $auth_id=new_auth($m_dbh,$user_ip);
|
|
|
$user_stats{$user_ip}{auth_id}=$auth_id;
|
|
|
#fix traffic detail for new users
|
|
|
- push(@batch_sql_traf,"UPDATE Traffic_detail set auth_id=$auth_id WHERE auth_id=0 AND `timestamp`>='$hour_date1' AND `timestamp`<='$hour_date2' AND (src_ip=$user_ip_aton OR dst_ip=$user_ip_aton)");
|
|
|
+ push(@batch_sql_traf,"UPDATE Traffic_detail set auth_id=$auth_id WHERE auth_id=0 AND `timestamp`>='$hour_date1' AND `timestamp`<'$hour_date2' AND (src_ip=$user_ip_aton OR dst_ip=$user_ip_aton)");
|
|
|
}
|
|
|
|
|
|
#current stats
|
|
|
@@ -303,8 +295,8 @@ push (@batch_sql_traf,$tSQL);
|
|
|
#hour stats
|
|
|
# get current stats
|
|
|
my $sql = "SELECT id, byte_in, byte_out FROM User_stats
|
|
|
-WHERE `timestamp`>=$hour_date1 AND `timestamp`<=$hour_date2 AND router_id=$router_id AND auth_id=$user_stats{$user_ip}{auth_id}";
|
|
|
-my $hour_stat = get_record_sql($dbh,$sql);
|
|
|
+WHERE `timestamp`>=$hour_date1 AND `timestamp`<$hour_date2 AND router_id=$router_id AND auth_id=$user_stats{$user_ip}{auth_id}";
|
|
|
+my $hour_stat = get_record_sql($m_dbh,$sql);
|
|
|
if (!$hour_stat) {
|
|
|
my $dSQL="INSERT INTO User_stats (timestamp,auth_id,router_id,byte_in,byte_out) VALUES($flow_date,'$user_stats{$user_ip}{auth_id}','$router_id','$user_stats{$user_ip}{in}','$user_stats{$user_ip}{out}')";
|
|
|
push (@batch_sql_traf,$dSQL);
|
|
|
@@ -318,26 +310,19 @@ $tSQL="UPDATE User_stats SET byte_in='".$hour_stat->{byte_in}."', byte_out='".$h
|
|
|
push (@batch_sql_traf,$tSQL);
|
|
|
}
|
|
|
|
|
|
-$dbh->{AutoCommit} = 0;
|
|
|
-my $sth;
|
|
|
-foreach my $sSQL(@batch_sql_traf) {
|
|
|
-$sth = $dbh->prepare($sSQL);
|
|
|
-$sth->execute;
|
|
|
-}
|
|
|
-$sth->finish;
|
|
|
-$dbh->{AutoCommit} = 1;
|
|
|
+batch_db_sql($m_dbh,@batch_sql_traf);
|
|
|
|
|
|
-db_log_debug($dbh,"Import traffic from router id: $router_id stop") if ($debug);
|
|
|
+db_log_debug($m_dbh,"Import traffic from router id: $router_id stop") if ($debug);
|
|
|
|
|
|
-db_log_verbose($dbh,"Recalc quotes started");
|
|
|
-recalc_quotes($dbh,$router_id);
|
|
|
-db_log_verbose($dbh,"Recalc quotes stopped");
|
|
|
+db_log_verbose($m_dbh,"Recalc quotes started");
|
|
|
+recalc_quotes($m_dbh,$router_id);
|
|
|
+db_log_verbose($m_dbh,"Recalc quotes stopped");
|
|
|
|
|
|
-db_log_verbose($dbh,"router id: $router_id stop Traffic statistics, lines: all => $stats{line}{all}, user=> $stats{line}{user}, free => $stats{line}{free}, illegal=> $stats{line}{illegal}");
|
|
|
-db_log_verbose($dbh,sprintf("router id: %d stop Traffic speed, line/s: all => %.2f, user=> %.2f, free => %.2f, unknown=> %.2f", $router_id, $stats{line}{all}/$timeshift, $stats{line}{user}/$timeshift, $stats{line}{free}/$timeshift, $stats{line}{illegal}/$timeshift));
|
|
|
-db_log_verbose($dbh,"router id: $router_id stop Traffic statistics, pkt: all => $stats{pkt}{all}, user_in=> $stats{pkt}{user_in}, user_in=> $stats{pkt}{user_out}, free => $stats{pkt}{free}, illegal=> $stats{pkt}{illegal}");
|
|
|
-db_log_verbose($dbh,sprintf("router id: %d stop Traffic speed, pkt/s: all => %.2f, user_in=> %.2f, user_out=> %.2f, free => %.2f, unknown=> %.2f", $router_id, $stats{pkt}{all}/$timeshift, $stats{pkt}{user_in}/$timeshift, $stats{pkt}{user_out}/$timeshift, $stats{pkt}{free}/$timeshift, $stats{pkt}{illegal}/$timeshift));
|
|
|
+db_log_verbose($m_dbh,"router id: $router_id stop Traffic statistics, lines: all => $stats{line}{all}, user=> $stats{line}{user}, free => $stats{line}{free}, illegal=> $stats{line}{illegal}");
|
|
|
+db_log_verbose($m_dbh,sprintf("router id: %d stop Traffic speed, line/s: all => %.2f, user=> %.2f, free => %.2f, unknown=> %.2f", $router_id, $stats{line}{all}/$timeshift, $stats{line}{user}/$timeshift, $stats{line}{free}/$timeshift, $stats{line}{illegal}/$timeshift));
|
|
|
+db_log_verbose($m_dbh,"router id: $router_id stop Traffic statistics, pkt: all => $stats{pkt}{all}, user_in=> $stats{pkt}{user_in}, user_in=> $stats{pkt}{user_out}, free => $stats{pkt}{free}, illegal=> $stats{pkt}{illegal}");
|
|
|
+db_log_verbose($m_dbh,sprintf("router id: %d stop Traffic speed, pkt/s: all => %.2f, user_in=> %.2f, user_out=> %.2f, free => %.2f, unknown=> %.2f", $router_id, $stats{pkt}{all}/$timeshift, $stats{pkt}{user_in}/$timeshift, $stats{pkt}{user_out}/$timeshift, $stats{pkt}{free}/$timeshift, $stats{pkt}{illegal}/$timeshift));
|
|
|
|
|
|
-$dbh->disconnect;
|
|
|
+$m_dbh->disconnect;
|
|
|
|
|
|
exit 0;
|