1
0

parse_flow.pl 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294
  1. #!/usr/bin/perl
  2. #
  3. # Copyright (C) Roman Dmitiriev, rnd@rajven.ru
  4. #
  5. use FindBin '$Bin';
  6. use lib "$Bin/";
  7. use strict;
  8. use DBI;
  9. use Time::Local;
  10. use Net::Patricia;
  11. use Data::Dumper;
  12. use Date::Parse;
  13. use Socket;
  14. use Rstat::config;
  15. use Rstat::main;
  16. use Rstat::net_utils;
  17. use Rstat::mysql;
  18. setpriority(0,0,19);
  19. my $router_id;
  20. if (scalar @ARGV>1) { $router_id=shift(@ARGV); } else { $router_id=$ARGV[0]; }
  21. if (!$router_id) {
  22. db_log_error($dbh,"Router id not defined! Bye...");
  23. exit 110;
  24. }
  25. my $timeshift = get_option($dbh,55)*60;
  26. db_log_debug($dbh,"Import traffic from router id: $router_id start. Timestep $timeshift sec.") if ($debug);
  27. my %stats;
  28. $stats{pkt}{all}=0;
  29. $stats{pkt}{user}=0;
  30. $stats{pkt}{user_in}=0;
  31. $stats{pkt}{user_out}=0;
  32. $stats{pkt}{free}=0;
  33. $stats{pkt}{unknown}=0;
  34. $stats{line}{all}=0;
  35. $stats{line}{user}=0;
  36. $stats{line}{free}=0;
  37. $stats{line}{unknown}=0;
  38. # net objects
  39. my $users = new Net::Patricia;
  40. InitSubnets();
  41. #get userid list
  42. my $user_auth_list = $dbh->prepare( "SELECT id,ip,user_id,save_traf FROM User_auth where deleted=0 ORDER by user_id,ip" );
  43. if ( !defined $user_auth_list ) { die "Cannot prepare statement: $DBI::errstr\n"; }
  44. $user_auth_list->execute;
  45. # user auth list
  46. my $authlist_ref = $user_auth_list->fetchall_arrayref();
  47. $user_auth_list->finish();
  48. my %user_stats;
  49. foreach my $row (@$authlist_ref) {
  50. $users->add_string($row->[1],$row->[0]);
  51. $user_stats{$row->[0]}{net}=$row->[1];
  52. $user_stats{$row->[0]}{id}=$row->[0];
  53. $user_stats{$row->[0]}{user_id}=$row->[2];
  54. $user_stats{$row->[0]}{save_traf}=$row->[3];
  55. $user_stats{$row->[0]}{in}=0;
  56. $user_stats{$row->[0]}{out}=0;
  57. $user_stats{$row->[0]}{pkt_in}=0;
  58. $user_stats{$row->[0]}{pkt_out}=0;
  59. }
  60. my $last_time = localtime();
  61. my $time_string;
  62. my $dbtime;
  63. my $hour_date;
  64. my $minute_date;
  65. my @batch_sql_traf=();
  66. open(FH,"-");
  67. while (my $line=<FH>) {
  68. $stats{line}{all}++;
  69. #1555573194.980;17 ; 77.243.0.12; 172.20.178.71; 53; 43432; 1; 134; 2; 1
  70. $line=~s/\s+//g;
  71. my ($l_time,$l_proto,$l_src_ip,$l_dst_ip,$l_src_port,$l_dst_port,$l_packets,$l_bytes,$l_in_dev,$l_out_dev) = split(/;/,$line);
  72. $stats{pkt}{all}+=$l_packets;
  73. if (!$l_time) { $stats{line}{illegal}++; $stats{pkt}{illegal}+=$l_packets; next; }
  74. if ($l_src_ip eq '0.0.0.0') { $stats{line}{illegal}++; $stats{pkt}{illegal}+=$l_packets; next; }
  75. if ($l_dst_ip eq '0.0.0.0') { $stats{line}{illegal}++; $stats{pkt}{illegal}+=$l_packets; next; }
  76. if ($l_src_ip eq '255.255.255.255') { $stats{line}{illegal}++; $stats{pkt}{illegal}+=$l_packets; next; }
  77. if ($l_dst_ip eq '255.255.255.255') { $stats{line}{illegal}++; $stats{pkt}{illegal}+=$l_packets; next; }
  78. if ($Special_Nets->match_string($l_src_ip) or $Special_Nets->match_string($l_dst_ip)) { $stats{line}{illegal}++; $stats{pkt}{illegal}+=$l_packets; next; }
  79. #unknown networks
  80. if (!$office_networks->match_string($l_src_ip) and !$office_networks->match_string($l_dst_ip)) { $stats{line}{illegal}++; $stats{pkt}{illegal}+=$l_packets; next; }
  81. #local forward
  82. if ($office_networks->match_string($l_src_ip) and $office_networks->match_string($l_dst_ip)) { $stats{line}{free}++; $stats{line}{free}+=$l_packets; next; }
  83. #free forward
  84. if ($office_networks->match_string($l_src_ip) and $free_networks->match_string($l_dst_ip)) { $stats{line}{free}++; $stats{line}{free}+=$l_packets; next; }
  85. if ($free_networks->match_string($l_src_ip) and $office_networks->match_string($l_dst_ip)) { $stats{line}{free}++; $stats{line}{free}+=$l_packets; next; }
  86. my $l_src_ip_aton=StrToIp($l_src_ip);
  87. my $l_dst_ip_aton=StrToIp($l_dst_ip);
  88. $last_time = $l_time;
  89. my ($sec,$min,$hour,$day,$month,$year,$zone) = (localtime($l_time))[0,1,2,3,4,5];
  90. $month++;
  91. $year += 1900;
  92. $time_string = sprintf "%04d-%02d-%02d %02d:%02d:%02d",$year,$month,$day,$hour,$min,$sec;
  93. $dbtime = $dbh->quote($time_string);
  94. $hour_date = $dbh->quote(sprintf "%04d-%02d-%02d %02d:00:00",$year,$month,$day,$hour);
  95. $minute_date = $dbh->quote(sprintf "%04d-%02d-%02d %02d:%02d:00",$year,$month,$day,$hour,$min);
  96. my $user_found = 0;
  97. # find user id
  98. if ($office_networks->match_string($l_src_ip)) {
  99. my $out_user = $users->match_string($l_src_ip);
  100. if ($out_user) {
  101. $user_stats{$out_user}{out} += $l_bytes;
  102. $user_stats{$out_user}{dbtime} = $minute_date;
  103. $user_stats{$out_user}{htime} = $hour_date;
  104. $user_stats{$out_user}{pkt_out} +=$l_packets;
  105. $user_found = 1;
  106. $stats{line}{user}++;
  107. $stats{pkt}{user_out}+=$l_packets;
  108. if ($save_detail and $user_stats{$out_user}{save_traf}) {
  109. my $dSQL="INSERT INTO Traffic_detail (auth_id,router_id,timestamp,proto,src_ip,dst_ip,src_port,dst_port,bytes,pkt) VALUES($out_user,$router_id,$dbtime,'$l_proto',$l_src_ip_aton,$l_dst_ip_aton,'$l_src_port','$l_dst_port','$l_bytes','$l_packets')";
  110. push (@batch_sql_traf,$dSQL);
  111. }
  112. }
  113. }
  114. if ($office_networks->match_string($l_dst_ip)) {
  115. my $in_user = $users->match_string($l_dst_ip);
  116. if ($in_user) {
  117. $user_stats{$in_user}{in} += $l_bytes;
  118. $user_stats{$in_user}{dbtime} = $minute_date;
  119. $user_stats{$in_user}{htime} = $hour_date;
  120. $user_stats{$in_user}{pkt_in} +=$l_packets;
  121. $stats{line}{user}++;
  122. $stats{pkt}{user_in}+=$l_packets;
  123. $user_found = 1;
  124. if ($save_detail and $user_stats{$in_user}{save_traf}) {
  125. my $dSQL="INSERT INTO Traffic_detail (auth_id,router_id,timestamp,proto,src_ip,dst_ip,src_port,dst_port,bytes,pkt) VALUES($in_user,$router_id,$dbtime,'$l_proto',$l_src_ip_aton,$l_dst_ip_aton,'$l_src_port','$l_dst_port','$l_bytes','$l_packets')";
  126. push (@batch_sql_traf,$dSQL);
  127. }
  128. }
  129. }
  130. if (scalar(@batch_sql_traf)>100000) {
  131. $dbh->{AutoCommit} = 0;
  132. my $f_sth;
  133. foreach my $sSQL(@batch_sql_traf) {
  134. $f_sth = $dbh->prepare($sSQL);
  135. $f_sth->execute;
  136. }
  137. $f_sth->finish;
  138. $dbh->{AutoCommit} = 1;
  139. @batch_sql_traf=();
  140. }
  141. if ($users->match_string($l_src_ip) or $users->match_string($l_dst_ip)) { next; }
  142. if (!$add_unknown_user) { $stats{line}{illegal}++; $stats{pkt}{illegal}+=$l_packets; next; }
  143. # find user ip
  144. my $user_ip;
  145. my $user_ip_aton;
  146. undef $user_ip;
  147. #add user by src ip only if dst not office network!!!!
  148. if (!$office_networks->match_string($l_dst_ip) and $office_networks->match_string($l_src_ip)) { $user_ip = $l_src_ip; }
  149. #skip unknown packet
  150. if (!$user_ip) { $stats{line}{illegal}++; $stats{pkt}{illegal}+=$l_packets; next; }
  151. $stats{line}{user}++;
  152. $user_ip_aton=StrToIp($user_ip);
  153. #new user
  154. my $auth_id=new_auth($dbh,$user_ip);
  155. next if (!$auth_id);
  156. db_log_warning($dbh,"New ip $user_ip added by netflow!");
  157. my $new_user = get_record_sql($dbh,"SELECT * FROM User_auth WHERE id=$auth_id");
  158. $users->add_string($user_ip,$auth_id);
  159. $user_stats{$auth_id}{net}=$user_ip;
  160. $user_stats{$auth_id}{user_id}=$new_user->{user_id};
  161. $user_stats{$auth_id}{id}=$auth_id;
  162. $user_stats{$auth_id}{in}=0;
  163. $user_stats{$auth_id}{out}=0;
  164. $user_stats{$auth_id}{pkt_in}=0;
  165. $user_stats{$auth_id}{pkt_out}=0;
  166. db_log_info($dbh,"Added user_auth id: $auth_id ip: $user_ip user_id: $new_user->{user_id}");
  167. if ($auth_id) {
  168. if ($save_detail) {
  169. my $dSQL="INSERT INTO Traffic_detail (auth_id,router_id,timestamp,proto,src_ip,dst_ip,src_port,dst_port,bytes) VALUES($auth_id,$router_id,$dbtime,'$l_proto',$l_src_ip_aton,$l_dst_ip_aton,'$l_src_port','$l_dst_port','$l_bytes')";
  170. push (@batch_sql_traf,$dSQL);
  171. }
  172. if ($l_src_ip eq $user_ip) {
  173. $user_stats{$auth_id}{out} += $l_bytes;
  174. $user_stats{$auth_id}{pkt_out} += $l_bytes;
  175. }
  176. if ($l_dst_ip eq $user_ip) {
  177. $user_stats{$auth_id}{in} += $l_bytes;
  178. $user_stats{$auth_id}{pkt_in} += $l_bytes;
  179. }
  180. $user_stats{$auth_id}{dbtime} = $minute_date;
  181. $user_stats{$auth_id}{htime} = $hour_date;
  182. } else {
  183. undef $user_ip;
  184. undef $user_ip_aton;
  185. }
  186. }
  187. #start hour
  188. my ($min,$hour,$day,$month,$year) = (localtime($last_time))[1,2,3,4,5];
  189. my $hour_date1 = $dbh->quote(sprintf "%04d-%02d-%02d %02d:00:00",$year+1900,$month+1,$day,$hour);
  190. my $flow_date = $dbh->quote(sprintf "%04d-%02d-%02d %02d:%02d:00",$year+1900,$month+1,$day,$hour,$min);
  191. #end hour
  192. ($min,$hour,$day,$month,$year) = (localtime($last_time+3600))[1,2,3,4,5];
  193. my $hour_date2 = $dbh->quote(sprintf "%04d-%02d-%02d %02d:00:00",$year+1900,$month+1,$day,$hour);
  194. # update database
  195. foreach my $row (keys %user_stats) {
  196. next if (!$user_stats{$row}{htime});
  197. #current stats
  198. my $tSQL="INSERT INTO User_stats_full (timestamp,auth_id,router_id,byte_in,byte_out,pkt_in,pkt_out,step) VALUES($flow_date,'$user_stats{$row}{id}','$router_id','$user_stats{$row}{in}','$user_stats{$row}{out}','$user_stats{$row}{pkt_in}','$user_stats{$row}{pkt_out}','$timeshift')";
  199. push (@batch_sql_traf,$tSQL);
  200. #last found timestamp
  201. $tSQL="UPDATE User_auth SET `last_found`=$flow_date WHERE id='$user_stats{$row}{id}'";
  202. push (@batch_sql_traf,$tSQL);
  203. #hour stats
  204. # get current stats
  205. my $sql = "SELECT id, byte_in, byte_out FROM User_stats
  206. WHERE `timestamp`>=$hour_date1 AND `timestamp`<$hour_date2 AND router_id=$router_id AND auth_id=$user_stats{$row}{id}";
  207. my $hour_stat = get_record_sql($dbh,$sql);
  208. if (!$hour_stat) {
  209. my $dSQL="INSERT INTO User_stats (timestamp,auth_id,router_id,byte_in,byte_out) VALUES($user_stats{$row}{htime},'$user_stats{$row}{id}','$router_id','$user_stats{$row}{in}','$user_stats{$row}{out}')";
  210. push (@batch_sql_traf,$dSQL);
  211. next;
  212. }
  213. if (!$hour_stat->{byte_in}) { $hour_stat->{byte_in}=0; }
  214. if (!$hour_stat->{byte_out}) { $hour_stat->{byte_out}=0; }
  215. $hour_stat->{byte_in} += $user_stats{$row}{in};
  216. $hour_stat->{byte_out} += $user_stats{$row}{out};
  217. $tSQL="UPDATE User_stats SET byte_in='".$hour_stat->{byte_in}."', byte_out='".$hour_stat->{byte_out}."' WHERE id=".$hour_stat->{id};
  218. push (@batch_sql_traf,$tSQL);
  219. }
  220. $dbh->{AutoCommit} = 0;
  221. my $sth;
  222. foreach my $sSQL(@batch_sql_traf) {
  223. $sth = $dbh->prepare($sSQL);
  224. $sth->execute;
  225. }
  226. $sth->finish;
  227. $dbh->{AutoCommit} = 1;
  228. db_log_debug($dbh,"Import traffic from router id: $router_id stop") if ($debug);
  229. db_log_verbose($dbh,"Recalc quotes started");
  230. recalc_quotes($dbh,$router_id);
  231. db_log_verbose($dbh,"Recalc quotes stopped");
  232. db_log_verbose($dbh,"router id: $router_id stop Traffic statistics, lines: all => $stats{line}{all}, user=> $stats{line}{user}, free => $stats{line}{free}, illegal=> $stats{line}{illegal}");
  233. db_log_verbose($dbh,sprintf("router id: %d stop Traffic speed, line/s: all => %.2f, user=> %.2f, free => %.2f, unknown=> %.2f", $router_id, $stats{line}{all}/$timeshift, $stats{line}{user}/$timeshift, $stats{line}{free}/$timeshift, $stats{line}{illegal}/$timeshift));
  234. db_log_verbose($dbh,"router id: $router_id stop Traffic statistics, pkt: all => $stats{pkt}{all}, user_in=> $stats{pkt}{user_in}, user_in=> $stats{pkt}{user_out}, free => $stats{pkt}{free}, illegal=> $stats{pkt}{illegal}");
  235. db_log_verbose($dbh,sprintf("router id: %d stop Traffic speed, pkt/s: all => %.2f, user_in=> %.2f, user_out=> %.2f, free => %.2f, unknown=> %.2f", $router_id, $stats{pkt}{all}/$timeshift, $stats{pkt}{user_in}/$timeshift, $stats{pkt}{user_out}/$timeshift, $stats{pkt}{free}/$timeshift, $stats{pkt}{illegal}/$timeshift));
  236. $dbh->disconnect;
  237. exit 0;