Преглед на файлове

bugfix: fixed illegal work netflow collector

Dmitriev Roman преди 3 месеца
родител
ревизия
29ab2aca36
променени са 2 файла, в които са добавени 197 реда и са изтрити 119 реда
  1. 168 100
      scripts/eye-statd.pl
  2. 29 19
      scripts/eyelib/main.pm

+ 168 - 100
scripts/eye-statd.pl

@@ -22,6 +22,7 @@ use eyelib::database;
 use eyelib::common;
 use eyelib::snmp;
 use Socket qw(AF_INET6 inet_ntop);
+use POSIX ":sys_wait_h";
 use IO::Socket;
 
 my $proc_name = $MY_NAME;
@@ -48,7 +49,11 @@ my %wan_dev;
 my %lan_dev;
 
 my @traffic = ();
-my $saving = 0;
+our $child_count = 0;
+
+my $check_interval = 10;
+my $reload_config = 0;
+our $flush_scheduled = 0;
 
 #user statistics for cached data
 my %user_stats;
@@ -57,9 +62,9 @@ my %wan_stats;
 my $MAXREAD = 9216;
 
 my $timeshift = get_option($dbh,55)*60;
-my $save_path =  get_option($dbh,72);
+if (!$timeshift or $timeshift<60) { $timeshift = 60; }
 
-my $thread_count = $cpu_count;
+my $save_path =  get_option($dbh,72);
 
 #save traffic to DB
 my $traf_lastflush = time();
@@ -75,8 +80,7 @@ my $netflow9_templates = {};
 $SIG{CHLD} = \&REAPER;
 $SIG{TERM} = \&TERM;
 $SIG{INT} = \&TERM;
-$SIG{HUP} = \&INIT;
-
+$SIG{HUP} = sub { $reload_config = 1; };
 
 if (IsNotRun($pid_file)) {
     Add_PID($pid_file);
@@ -86,9 +90,11 @@ if (IsNotRun($pid_file)) {
     }
 
 sub REAPER {
-	wait;
-	$saving = 0;
-	$SIG{CHLD} = \&REAPER;
+    while (waitpid(-1, WNOHANG) > 0) {
+        $child_count--;  # Уменьшаем счётчик
+    }
+#    wait;
+    $SIG{CHLD} = \&REAPER;
 }
 
 sub TERM {
@@ -99,59 +105,97 @@ sub TERM {
 	exit 0;
 }
 
-sub INIT {
+sub refresh_config {
 
-# Create new database handle. If we can't connect, die()
-my $hdb = init_db();
+    log_verbose("Reloading configuration from database...");
 
-InitSubnets();
+    # Create new database handle. If we can't connect, die()
+    my $hdb = init_db();
 
-init_option($hdb);
+    log_debug("Database connection established");
 
-#a directory for storing traffic details in text form
-$save_path = get_option($hdb,72);
+    InitSubnets();
+    log_debug("Subnets initialized");
 
-#the period for resetting statistics from netflow to billing
-$timeshift = get_option($hdb,55)*60;
+    init_option($hdb);
+    log_debug("Global options reloaded");
 
-@router_ref = get_records_sql($hdb,"SELECT * FROM devices WHERE deleted=0 AND device_type=2 AND snmp_version>0 ORDER by ip" );
-@interfaces = get_records_sql($hdb,"SELECT * FROM device_l3_interfaces ORDER by device_id" );
+    # a directory for storing traffic details in text form
+    $save_path = get_option($hdb, 72);
 
-#router device_id by known device ip
-foreach my $row (@router_ref) {
-    setCommunity($row);
-    $routers{$row->{id}}=$row;
-    my $l3_list = getIpAdEntIfIndex($row->{ip},$row->{snmp});
+    # the period for resetting statistics from netflow to billing
+    my $timeshift_minutes = get_option($hdb, 55);
+    $timeshift = $timeshift_minutes * 60;
+    log_verbose("Flush interval set to $timeshift seconds ($timeshift_minutes minutes)");
 
-    #create hash for interface snmp index => ip-address at interface =1;
-    foreach my $router_ip (keys %$l3_list) { $routers_svi{$row->{id}}{$l3_list->{$router_ip}}{$router_ip}=1; }
+    @router_ref = get_records_sql($hdb, "SELECT * FROM devices WHERE deleted=0 AND device_type=2 AND snmp_version>0 ORDER by ip");
+    log_verbose("Loaded " . scalar(@router_ref) . " active routers");
 
-    #create hash by all ip-addresses for router
-    foreach my $router_ip (keys %$l3_list) {
-        $routers_by_ip{$router_ip}->{id}=$row->{id};
-        if ($config_ref{save_detail}) { 
-            $routers_by_ip{$router_ip}->{save}=$row->{netflow_save};
-            } else { $routers_by_ip{$router_ip}->{save}=0; }
-        }
-    }
+    @interfaces = get_records_sql($hdb, "SELECT * FROM device_l3_interfaces ORDER by device_id");
+    log_verbose("Loaded " . scalar(@interfaces) . " L3 interfaces");
 
-#snmp index for WAN/LAN interface by device id
-foreach my $row (@interfaces) {
-    if ($row->{interface_type}) { $wan_dev{$row->{device_id}}{$row->{snmpin}}=1; } else { $lan_dev{$row->{device_id}}{$row->{snmpin}}=1; }
-    }
+    # Clear all router-related caches
+    %routers_svi = ();
+    %routers_by_ip = ();
+    %routers = ();
 
-#get userid list
-my @auth_list_ref = get_records_sql($hdb,"SELECT id,ip,save_traf FROM user_auth where deleted=0 ORDER by id");
+    # Rebuild router data
+    foreach my $row (@router_ref) {
+        setCommunity($row);
+        $routers{$row->{id}} = $row;
+        my $l3_list = getIpAdEntIfIndex($row->{ip}, $row->{snmp});
 
-foreach my $row (@auth_list_ref) {
-    $user_stats{$row->{ip}}{auth_id}=$row->{id};
-    if ($config_ref{save_detail}) {
-        $user_stats{$row->{ip}}{save_traf}=$row->{save_traf};
+        foreach my $router_ip (keys %$l3_list) {
+            $routers_svi{$row->{id}}{$l3_list->{$router_ip}}{$router_ip} = 1;
+        }
+
+        foreach my $router_ip (keys %$l3_list) {
+            $routers_by_ip{$router_ip}->{id} = $row->{id};
+            if ($config_ref{save_detail}) {
+                $routers_by_ip{$router_ip}->{save} = $row->{netflow_save};
+            } else {
+                $routers_by_ip{$router_ip}->{save} = 0;
+            }
+        }
+    }
+    log_debug("Router IP mappings rebuilt");
+
+    # Clear and rebuild WAN/LAN interface mappings
+    %wan_dev = ();
+    %lan_dev = ();
+    foreach my $row (@interfaces) {
+        if ($row->{interface_type}) {
+            $wan_dev{$row->{device_id}}{$row->{snmpin}} = 1;
         } else {
-        $user_stats{$row->{ip}}{save_traf}=0;
+            $lan_dev{$row->{device_id}}{$row->{snmpin}} = 1;
         }
     }
-$hdb->disconnect();
+    log_debug("WAN/LAN interface mappings rebuilt");
+
+    # Reload user list — FULL RESET of %user_stats
+    my @auth_list_ref = get_records_sql($hdb, "SELECT id,ip,save_traf FROM user_auth WHERE deleted=0 ORDER BY id");
+    log_verbose("Loaded " . scalar(@auth_list_ref) . " active user ip-addresses");
+
+    my $save_traf_count = 0;
+    %user_stats = ();
+    foreach my $row (@auth_list_ref) {
+        $user_stats{$row->{ip}}{auth_id} = $row->{id};
+        if ($config_ref{save_detail}) {
+            $user_stats{$row->{ip}}{save_traf} = $row->{save_traf};
+        } else {
+            $user_stats{$row->{ip}}{save_traf} = 0;
+        }
+        $save_traf_count++ if ($user_stats{$row->{ip}}{save_traf});
+        $user_stats{$row->{ip}}{in}        = 0;
+        $user_stats{$row->{ip}}{out}       = 0;
+        $user_stats{$row->{ip}}{pkt_in}    = 0;
+        $user_stats{$row->{ip}}{pkt_out}   = 0;
+        $user_stats{$row->{ip}}{last_found}= 0;
+    }
+    log_verbose("Found " . $save_traf_count . " active ip-addresses with full save traffic log");
+
+    $hdb->disconnect();
+    log_verbose("Configuration reload completed. All runtime statistics cleared.");
 }
 
 ############### MAIN ##########################
@@ -159,7 +203,7 @@ $hdb->disconnect();
 #close default database
 $dbh->disconnect();
 
-INIT();
+refresh_config();
 
 my $lsn_nflow;
 my $sel = IO::Select->new();
@@ -173,27 +217,42 @@ if ($server_port > 0) {
 
 my ($him,$datagram,$flags);
 
-# main datagram receive loop
+
 while (1) {
-	while (my @ready = $sel->can_read) {
-		foreach my $server (@ready) {
-			$him = $server->recv($datagram, $MAXREAD);
-			next if (!$him);
-			
-			my ($port, $ipaddr) = sockaddr_in($server->peername);
-			
-			if (defined($lsn_nflow) && $server == $lsn_nflow) {
-				my ($version) = unpack("n", $datagram);
-                                if ($version == 5) {
-                                        parse_netflow_v5($datagram, $ipaddr);
-                                } elsif ($version == 9) {
-                                        parse_netflow_v9($datagram, $ipaddr);
-                                } else {
-                                        print "unknown NetFlow version: $version\n";
-                                }
-			}
-		}
-	}
+    # Ждём новых пакетов НЕ дольше $check_interval секунд
+    my @ready = $sel->can_read($check_interval);
+    # Обрабатываем все готовые сокеты
+    foreach my $server (@ready) {
+        $him = $server->recv($datagram, $MAXREAD);
+        next if (!$him);
+        my ($port, $ipaddr) = sockaddr_in($server->peername);
+
+        if (defined($lsn_nflow) && $server == $lsn_nflow) {
+            my ($version) = unpack("n", $datagram);
+            if ($version == 5) {
+                parse_netflow_v5($datagram, $ipaddr);
+            } elsif ($version == 9) {
+                parse_netflow_v9($datagram, $ipaddr);
+            } else {
+                print "unknown NetFlow version: $version\n";
+            }
+        }
+    }
+    # Проверяем, не пора ли сбросить статистику
+    my $elapsed = time() - $traf_lastflush;
+    if (@traffic && $elapsed >= $timeshift) {
+        unless ($flush_scheduled) {
+            $flush_scheduled = 1;
+            log_verbose("Check flush time. Now: ".time()." Last: $traf_lastflush Diff: $elapsed");
+            flush_traffic(0);
+        }
+    }
+
+    if ($reload_config) {
+        $reload_config = 0;
+        log_verbose("Reloading config due to HUP signal");
+        refresh_config();
+        }
 }
 
 sub parse_netflow_v5 {
@@ -374,7 +433,6 @@ sub parse_netflow_v9_data_flowset {
             }
             # ICMP_TYPE (32)
             elsif ($field_type == 32) {
-                $flow{icmp_type} = unpack("C", $value);
             }
             # ICMP_CODE (33)
             elsif ($field_type == 33) {
@@ -461,58 +519,69 @@ sub save_flow {
 	if (exists $wan_dev{$router_id}->{$flow->{snmp_out}}) { $flow->{direction} = 1; }
 
 	push(@traffic,$flow);
-	flush_traffic(0);
 }
 
 sub flush_traffic {
-
 my $force = shift || 0;
 
-if (!$force && ($saving || ((time - $traf_lastflush) < $timeshift))) { return; }
+if (!$force && ($child_count > 0 or $flush_scheduled==0)) {
+        log_verbose("Child exists ($child_count), skipping fork");
+        return;
+    }
 
-$saving++;
 
 my $pid = fork();
 
-INIT();
-
-log_verbose("Start flush traffic to DB");
-log_debug("ROUTERS-SVI:".Dumper(\%routers_svi));
-log_debug("ROUTERS by IP::".Dumper(\%routers_by_ip));
-log_debug("ROUTERS:".Dumper(\%routers));
-log_debug("WAN-DEVS:".Dumper(\%wan_dev));
-log_debug("LAN-DEVS:".Dumper(\%lan_dev));
+refresh_config();
 
 if (!defined $pid) {
-    $saving = 0;
-    print "cannot fork! Save traffic and exit...\n";
-    } elsif ($pid != 0) {
-        # in parent
-	$traf_lastflush = time();
-	#clean main cache
-	@traffic = ();
+        log_error("Fork failed: $!");
+        $flush_scheduled = 0;
         return;
     }
 
+if ($pid>0) {
+        log_verbose("Parent $$: forked child $pid");
+        $child_count++;
+        # Сбрасываем статистику
+        $traf_lastflush = time();
+        @traffic = ();
+        $flush_scheduled = 0;
+        return;
+        }
 
 #create oper-cache
 my @flush_table = ();
-
 push(@flush_table,@traffic);
+@traffic = ();
+
+$SIG{HUP}  = 'IGNORE';
+$SIG{TERM} = 'DEFAULT';
+
+log_verbose("Start flush traffic to DB");
+log_debug("ROUTERS-SVI:".Dumper(\%routers_svi));
+log_debug("ROUTERS by IP::".Dumper(\%routers_by_ip));
+log_debug("ROUTERS:".Dumper(\%routers));
+log_debug("WAN-DEVS:".Dumper(\%wan_dev));
+log_debug("LAN-DEVS:".Dumper(\%lan_dev));
+
+log_verbose("Child $$: Start flush traffic to DB");
 
 my $hdb=init_db();
 
 #saved packet by users
-my @detail_traffic = ();
+my @detail_traffic=();
 my %saved_netflow = ();
 
-my %routers_found;
+%wan_stats = ();
+
+my %routers_found = ();
 
 #last packet timestamp
 my $last_time = time();
 my $start_time;
 
-log_debug("Netflow statistics calculation started");
+log_verbose("Netflow statistics calculation started for ".scalar @flush_table ." records");
 
 foreach my $traf_record (@flush_table) {
 
@@ -609,7 +678,7 @@ if (exists $wan_dev{$router_id}->{$traf_record->{snmp_out}} and exists $wan_dev{
             if (exists $wan_stats{$router_id}{$traf_record->{snmp_out}}{forward_out}) {
                 $wan_stats{$router_id}{$traf_record->{snmp_out}}{forward_out}+=$traf_record->{octets};
                 } else {
-                $wan_stats{$router_id}{$traf_record->{snmp_out}}{forward_out}+=$traf_record->{octets};
+                $wan_stats{$router_id}{$traf_record->{snmp_out}}{forward_out}=$traf_record->{octets};
                 }
             } else {
             #log_debug("ROUTER id: $router_id FI-DATA: ".hash_to_kv_csv($traf_record));
@@ -617,7 +686,7 @@ if (exists $wan_dev{$router_id}->{$traf_record->{snmp_out}} and exists $wan_dev{
             if (exists $wan_stats{$router_id}{$traf_record->{snmp_in}}{forward_in}) {
                 $wan_stats{$router_id}{$traf_record->{snmp_in}}{forward_in}+=$traf_record->{octets};
                 } else {
-                $wan_stats{$router_id}{$traf_record->{snmp_in}}{forward_in}+=$traf_record->{octets};
+                $wan_stats{$router_id}{$traf_record->{snmp_in}}{forward_in}=$traf_record->{octets};
                 }
             }
         } else {
@@ -710,7 +779,7 @@ $last_time = $traf_record->{starttime};
 
 $user_stats{$user_ip}{last_found} = $last_time;
 
-next if (!$config_ref{save_detail} and !$user_stats{$user_ip}{save_traf});
+next unless ( $config_ref{save_detail} and $user_stats{$user_ip}{save_traf});
 
 my $l_src_ip_aton=StrToIp($l_src_ip);
 my $l_dst_ip_aton=StrToIp($l_dst_ip);
@@ -730,6 +799,7 @@ push @detail_traffic, [
     $traf_record->{octets},
     $traf_record->{pkts}
     ];
+
 }
 
 log_debug("The netflow statistics calculation is finished");
@@ -932,21 +1002,19 @@ if ($config_ref{enable_quotes}) {
     }
 
 if (scalar(@detail_traffic)) {
-    db_log_debug($hdb,"Start write traffic detail to DB. ".scalar @detail_traffic." lines count") if ($debug);
+    log_verbose("Start write traffic detail to DB. ".scalar @detail_traffic." lines count");
     my $traffic_fields = ['auth_id', 'router_id', 'ts', 'proto', 'src_ip', 'dst_ip', 'src_port', 'dst_port', 'bytes', 'pkt'];
     unshift @detail_traffic, $traffic_fields;
     batch_db_sql_csv("traffic_detail",\@detail_traffic);
-    @detail_traffic = ();
-    db_log_debug($hdb,"Write traffic detail to DB stopped") if ($debug);
+    undef @detail_traffic;
+    log_verbose("Write traffic detail is finished");
     }
 
 $hdb->disconnect();
 
-$saving = 0;
-
 log_verbose("Flush traffic to DB is finished");
 
-exit;
+exit 0;
 }
 
 if (IsMyPID($pid_file)) { Remove_PID($pid_file); }

+ 29 - 19
scripts/eyelib/main.pm

@@ -117,25 +117,35 @@ sub hasNotifyFlag {
 #---------------------------------------------------------------------------------------------------------
 
 sub log_file {
-return if (!$_[0]);
-return if (!$_[1]);
-return if (!$_[2]);
-open (LG,">>$_[0]") || die("Error open log file $_[0]!!! die...");
-my ($sec,$min,$hour,$mday,$mon,$year) = (localtime())[0,1,2,3,4,5];
-$mon += 1; $year += 1900;
-my @msg = split("\n",$_[2]);
-foreach my $row (@msg) {
-	next if (!$row);
-	printf LG "%04d%02d%02d-%02d%02d%02d %s [%d] %s\n",$year,$mon,$mday,$hour,$min,$sec,$_[1],$$,$row;
-	}
-close (LG);
-if ($< ==0) {
-    my $uid = getpwnam $log_owner_user;
-    my $gid = getgrnam $log_owner_user;
-    if (!$gid) { $gid=getgrnam "root"; }
-    if (!$uid) { $uid=getpwnam "root"; }
-    chown $uid, $gid, $_[0];
-    chmod oct("0660"), $_[0];
+    return if (!$_[0]);
+    return if (!$_[1]);
+    return if (!$_[2]);
+    
+    # Вместо die - предупреждение и возврат
+    unless (open (LG,">>$_[0]")) {
+        # Пишем в stderr как последнее средство
+        print STDERR "WARNING: Cannot open log file $_[0]: $!\n";
+        return;
+    }
+    
+    my ($sec,$min,$hour,$mday,$mon,$year) = (localtime())[0,1,2,3,4,5];
+    $mon += 1; $year += 1900;
+    my @msg = split("\n",$_[2]);
+    
+    foreach my $row (@msg) {
+        next if (!$row);
+        printf LG "%04d%02d%02d-%02d%02d%02d %s [%d] %s\n",$year,$mon,$mday,$hour,$min,$sec,$_[1],$$,$row;
+    }
+    
+    close (LG);
+    
+    if ($< ==0) {
+        my $uid = getpwnam $log_owner_user;
+        my $gid = getgrnam $log_owner_user;
+        if (!$gid) { $gid=getgrnam "root"; }
+        if (!$uid) { $uid=getpwnam "root"; }
+        chown $uid, $gid, $_[0];
+        chmod oct("0660"), $_[0];
     }
 }