parse_flow.pl 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351
  1. #!/usr/bin/perl
  2. #
  3. # Copyright (C) Roman Dmitiriev, rnd@rajven.ru
  4. #
  5. use utf8;
  6. use warnings;
  7. use Encode;
  8. use open qw(:std :encoding(UTF-8));
  9. no warnings 'utf8';
  10. use English;
  11. use base;
  12. use FindBin '$Bin';
  13. use lib "/opt/Eye/scripts";
  14. use strict;
  15. use DBI;
  16. use Time::Local;
  17. use Net::Patricia;
  18. use Data::Dumper;
  19. use Date::Parse;
  20. use Socket;
  21. use eyelib::config;
  22. use eyelib::main;
  23. use eyelib::net_utils;
  24. use eyelib::database;
  25. use eyelib::logconfig;
  26. use eyelib::common;
  27. use Parallel::ForkManager;
  28. if (!$ARGV[0]) { exit 110; }
  29. my $router_id=$ARGV[0];
  30. my $fork_count = $cpu_count*10;
  31. my $timeshift = get_option($dbh,55)*60;
  32. db_log_debug($dbh,"Import traffic from router id: $router_id start. Timestep $timeshift sec.") if ($debug);
  33. my %stats;
  34. $stats{pkt}{all}=0;
  35. $stats{pkt}{user_in}=0;
  36. $stats{pkt}{user_out}=0;
  37. $stats{pkt}{free}=0;
  38. $stats{pkt}{unknown}=0;
  39. $stats{line}{all}=0;
  40. $stats{line}{user}=0;
  41. $stats{line}{free}=0;
  42. $stats{line}{unknown}=0;
  43. # net objects
  44. my $users = new Net::Patricia;
  45. InitSubnets();
  46. #get userid list
  47. my @auth_list_ref = get_records_sql($dbh,"SELECT id,ip,user_id,save_traf FROM user_auth where deleted=0 ORDER by user_id");
  48. my %user_stats;
  49. foreach my $row (@auth_list_ref) {
  50. $users->add_string($row->{ip},$row->{id});
  51. $user_stats{$row->{ip}}{net}=$row->{ip};
  52. $user_stats{$row->{ip}}{auth_id}=$row->{id};
  53. $user_stats{$row->{ip}}{user_id}=$row->{user_id};
  54. $user_stats{$row->{ip}}{save_traf}=$row->{save_traf};
  55. $user_stats{$row->{ip}}{in}=0;
  56. $user_stats{$row->{ip}}{out}=0;
  57. $user_stats{$row->{ip}}{pkt_in}=0;
  58. $user_stats{$row->{ip}}{pkt_out}=0;
  59. }
  60. my $last_time = localtime();
  61. my $hour_date;
  62. my $minute_date;
  63. my @batch_sql_traf=();
  64. my $pm = Parallel::ForkManager->new($fork_count);
  65. $pm->run_on_finish(
  66. sub {
  67. my ($pid, $exit, $ident, $signal, $core, $data) = @_;
  68. if ($data) {
  69. my $dataref = ${$data};
  70. my $f_dbh=init_db();
  71. db_log_debug($f_dbh,"Run get data from child") if ($debug);
  72. foreach my $user_ip (keys %{$dataref->{stats}}) {
  73. $user_stats{$user_ip}{in} += $dataref->{stats}{$user_ip}{in};
  74. $user_stats{$user_ip}{pkt_in} +=$dataref->{stats}{$user_ip}{pkt_in};
  75. $user_stats{$user_ip}{out} += $dataref->{stats}{$user_ip}{out};
  76. $user_stats{$user_ip}{pkt_out} +=$dataref->{stats}{$user_ip}{pkt_out};
  77. }
  78. $stats{pkt}{all}+=$dataref->{pkt}{all};
  79. $stats{pkt}{user_in}+=$dataref->{pkt}{user_in};
  80. $stats{pkt}{user_out}+=$dataref->{pkt}{user_out};
  81. $stats{pkt}{free}+=$dataref->{pkt}{free};
  82. $stats{pkt}{unknown}+=$dataref->{pkt}{unknown};
  83. $stats{line}{all}+=$dataref->{line}{all};
  84. $stats{line}{user}+=$dataref->{line}{user};
  85. $stats{line}{free}+=$dataref->{line}{free};
  86. $stats{line}{unknown}+=$dataref->{line}{unknown};
  87. $last_time = $dataref->{last_time};
  88. db_log_debug($f_dbh,"Get data from child stopped") if ($debug);
  89. $f_dbh->disconnect;
  90. }
  91. }
  92. );
  93. $dbh->disconnect;
  94. my @input_buf=();
  95. my $line_count = 0;
  96. my $child_count = 0;
  97. while (my $raw_line = <STDIN>) {
  98. chomp($raw_line);
  99. $raw_line=~s/\s+//g;
  100. $line_count++;
  101. push(@input_buf,$raw_line);
  102. if ($line_count < 50000 and $raw_line =~ /\S/) { next; }
  103. $line_count = 0;
  104. $child_count ++;
  105. my @tmp = ();
  106. push (@tmp,@input_buf);
  107. undef @input_buf;
  108. $pm->start and next;
  109. my $ret = calc_stats(\@tmp);
  110. $pm->finish(0, \$ret);
  111. }
  112. if (scalar(@input_buf)) {
  113. $child_count ++;
  114. $pm->start;
  115. my $ret = calc_stats(\@input_buf);
  116. $pm->finish(0, \$ret);
  117. }
  118. $pm->wait_all_children;
  119. undef(@input_buf);
  120. sub calc_stats {
  121. my $lines = shift;
  122. return if (!$lines or !scalar @$lines);
  123. my $f_dbh = init_db();
  124. db_log_debug($f_dbh,"Started child $child_count for ".scalar @$lines." lines count") if ($debug);
  125. my $lines_stats;
  126. $lines_stats->{pkt}{all}=0;
  127. $lines_stats->{pkt}{user_in}=0;
  128. $lines_stats->{pkt}{user_out}=0;
  129. $lines_stats->{pkt}{free}=0;
  130. $lines_stats->{pkt}{unknown}=0;
  131. $lines_stats->{line}{all}=0;
  132. $lines_stats->{line}{user}=0;
  133. $lines_stats->{line}{free}=0;
  134. $lines_stats->{line}{unknown}=0;
  135. my @detail_traffic=();
  136. foreach my $line (@$lines) {
  137. next if (!$line);
  138. my ($l_time,$l_proto,$l_src_ip,$l_dst_ip,$l_src_port,$l_dst_port,$l_packets,$l_bytes,$l_in_dev,$l_out_dev) = split(/;/,$line);
  139. next if (!$l_time or !$l_src_ip or !$l_dst_ip);
  140. $lines_stats->{pkt}{all}+=$l_packets;
  141. $lines_stats->{line}{all}++;
  142. $lines_stats->{last_time} = $l_time;
  143. if (!$l_time) { $lines_stats->{line}{illegal}++; $lines_stats->{pkt}{illegal}+=$l_packets; next; }
  144. if ($l_src_ip eq '0.0.0.0') { $lines_stats->{line}{illegal}++; $lines_stats->{pkt}{illegal}+=$l_packets; next; }
  145. if ($l_dst_ip eq '0.0.0.0') { $lines_stats->{line}{illegal}++; $lines_stats->{pkt}{illegal}+=$l_packets; next; }
  146. if ($l_src_ip eq '255.255.255.255') { $lines_stats->{line}{illegal}++; $lines_stats->{pkt}{illegal}+=$l_packets; next; }
  147. if ($l_dst_ip eq '255.255.255.255') { $lines_stats->{line}{illegal}++; $lines_stats->{pkt}{illegal}+=$l_packets; next; }
  148. #special networks
  149. if ($Special_Nets and $Special_Nets->match_string($l_src_ip) or $Special_Nets->match_string($l_dst_ip)) { $lines_stats->{line}{illegal}++; $lines_stats->{pkt}{illegal}+=$l_packets; next; }
  150. #unknown networks
  151. if ($office_networks and (!$office_networks->match_string($l_src_ip) and !$office_networks->match_string($l_dst_ip))) { $lines_stats->{line}{illegal}++; $lines_stats->{pkt}{illegal}+=$l_packets; next; }
  152. #local forward
  153. if ($office_networks and ($office_networks->match_string($l_src_ip) and $office_networks->match_string($l_dst_ip))) { $lines_stats->{line}{free}++; $lines_stats->{line}{free}+=$l_packets; next; }
  154. #free forward
  155. if ($office_networks and ($office_networks->match_string($l_src_ip) and $free_networks->match_string($l_dst_ip))) { $lines_stats->{line}{free}++; $lines_stats->{line}{free}+=$l_packets; next; }
  156. if ($free_networks and ($free_networks->match_string($l_src_ip) and $office_networks->match_string($l_dst_ip))) { $lines_stats->{line}{free}++; $lines_stats->{line}{free}+=$l_packets; next; }
  157. my $l_src_ip_aton=StrToIp($l_src_ip);
  158. my $l_dst_ip_aton=StrToIp($l_dst_ip);
  159. my ($sec,$min,$hour,$day,$month,$year,$zone) = (localtime($l_time))[0,1,2,3,4,5];
  160. $month++;
  161. $year += 1900;
  162. #my $full_time = $f_dbh->quote(sprintf "%04d-%02d-%02d %02d:%02d:%02d",$year,$month,$day,$hour,$min,$sec);
  163. my $full_time = sprintf "%04d-%02d-%02d %02d:%02d:%02d",$year,$month,$day,$hour,$min,$sec;
  164. my $user_ip;
  165. my $auth_id;
  166. # find user
  167. if ($users->match_string($l_src_ip)) {
  168. $user_ip = $l_src_ip;
  169. $lines_stats->{stats}{$user_ip}{ip}=$user_ip;
  170. if (!$lines_stats->{stats}{$user_ip}{out}) { $lines_stats->{stats}{$user_ip}{out}=0; }
  171. if (!$lines_stats->{stats}{$user_ip}{pkt_out}) { $lines_stats->{stats}{$user_ip}{pkt_out}=0; }
  172. $lines_stats->{stats}{$user_ip}{out} += $l_bytes;
  173. $lines_stats->{stats}{$user_ip}{pkt_out} +=$l_packets;
  174. $lines_stats->{line}{user}++;
  175. $lines_stats->{pkt}{user_out}+=$l_packets;
  176. }
  177. if ($users->match_string($l_dst_ip)) {
  178. $user_ip = $l_dst_ip;
  179. $lines_stats->{stats}{$user_ip}{ip}=$l_dst_ip;
  180. if (!$lines_stats->{stats}{$user_ip}{in}) { $lines_stats->{stats}{$user_ip}{in}=0; }
  181. if (!$lines_stats->{stats}{$user_ip}{pkt_in}) { $lines_stats->{stats}{$user_ip}{pkt_in}=0; }
  182. $lines_stats->{stats}{$user_ip}{in} += $l_bytes;
  183. $lines_stats->{stats}{$user_ip}{pkt_in} +=$l_packets;
  184. $lines_stats->{line}{user}++;
  185. $lines_stats->{pkt}{user_in}+=$l_packets;
  186. }
  187. my $auth_id;
  188. if ($user_ip) { $auth_id = $users->match_string($user_ip); } else { $auth_id = 0; }
  189. #save full packet
  190. if ($config_ref{save_detail}) {
  191. my @detail_array = ($auth_id,$router_id,$full_time,$l_proto,$l_src_ip_aton,$l_dst_ip_aton,$l_src_port,$l_dst_port,$l_bytes,$l_packets);
  192. if ($auth_id and $user_stats{$user_ip}{save_traf}) { push(@detail_traffic,\@detail_array); }
  193. if (!$auth_id and $config_ref{add_unknown_user}) { push(@detail_traffic,\@detail_array); }
  194. }
  195. if ($auth_id) { next; }
  196. if (!$config_ref{add_unknown_user}) { $lines_stats->{line}{illegal}++; $lines_stats->{pkt}{illegal}+=$l_packets; next; }
  197. #add user by src ip only if dst not office network!!!!
  198. #ignore dst traffic for create user
  199. if (!$office_networks->match_string($l_dst_ip) and $office_networks->match_string($l_src_ip)) {
  200. $user_ip = $l_src_ip;
  201. $users->add_string($user_ip,0);
  202. }
  203. if (!$user_ip) { $lines_stats->{line}{illegal}++; $lines_stats->{pkt}{illegal}+=$l_packets; next; }
  204. if ($user_ip eq $l_src_ip) {
  205. $lines_stats->{stats}{$user_ip}{ip}=$user_ip;
  206. $lines_stats->{stats}{$user_ip}{auth_id}= 0;
  207. if (!$lines_stats->{stats}{$user_ip}{out}) { $lines_stats->{stats}{$user_ip}{out}=0; }
  208. if (!$lines_stats->{stats}{$user_ip}{pkt_out}) { $lines_stats->{stats}{$user_ip}{pkt_out}=0; }
  209. $lines_stats->{stats}{$user_ip}{out} += $l_bytes;
  210. $lines_stats->{stats}{$user_ip}{pkt_out} +=$l_packets;
  211. $lines_stats->{line}{user}++;
  212. $lines_stats->{pkt}{user_out}+=$l_packets;
  213. }
  214. }
  215. db_log_debug($f_dbh,"Stopped child $child_count analyze data") if ($debug);
  216. if (scalar(@detail_traffic)) {
  217. db_log_debug($f_dbh,"Start write traffic detail to DB. ".scalar @detail_traffic." lines count") if ($debug);
  218. batch_db_sql_cached($f_dbh,"INSERT INTO traffic_detail (auth_id,router_id,ts,proto,src_ip,dst_ip,src_port,dst_port,bytes,pkt) VALUES(?,?,?,?,?,?,?,?,?,?)",\@detail_traffic);
  219. db_log_debug($f_dbh,"Write traffic detail to DB stopped") if ($debug);
  220. }
  221. $f_dbh->disconnect;
  222. return $lines_stats;
  223. }
  224. my $m_dbh=init_db();
  225. ####################################################################################################
  226. if (!$last_time) { $last_time = localtime(); }
  227. #start hour
  228. my ($min,$hour,$day,$month,$year) = (localtime($last_time))[1,2,3,4,5];
  229. #flow time
  230. my $flow_date = $m_dbh->quote(sprintf "%04d-%02d-%02d %02d:%02d:00",$year+1900,$month+1,$day,$hour,$min);
  231. #start stat time
  232. my $hour_date1 = $m_dbh->quote(sprintf "%04d-%02d-%02d %02d:00:00",$year+1900,$month+1,$day,$hour);
  233. #end hour
  234. ($hour,$day,$month,$year) = (localtime($last_time+3600))[2,3,4,5];
  235. my $hour_date2 = $m_dbh->quote(sprintf "%04d-%02d-%02d %02d:00:00",$year+1900,$month+1,$day,$hour);
  236. # update database
  237. foreach my $user_ip (keys %user_stats) {
  238. my $user_ip_aton=StrToIp($user_ip);
  239. my $auth_id = $user_stats{$user_ip}{auth_id};
  240. if (!$auth_id) {
  241. $auth_id=new_auth($m_dbh,$user_ip);
  242. $user_stats{$user_ip}{auth_id}=$auth_id;
  243. #fix traffic detail for new users
  244. push(@batch_sql_traf,"UPDATE traffic_detail set auth_id=$auth_id WHERE auth_id=0 AND ts>=$hour_date1 AND ts<$hour_date2 AND (src_ip=$user_ip_aton OR dst_ip=$user_ip_aton)");
  245. }
  246. #skip empty stats
  247. if ($user_stats{$user_ip}{in} + $user_stats{$user_ip}{out} ==0) { next; }
  248. #current stats
  249. my $tSQL="INSERT INTO user_stats_full (ts,auth_id,router_id,byte_in,byte_out,pkt_in,pkt_out,step) VALUES($flow_date,'$auth_id','$router_id','$user_stats{$user_ip}{in}','$user_stats{$user_ip}{out}','$user_stats{$user_ip}{pkt_in}','$user_stats{$user_ip}{pkt_out}','$timeshift')";
  250. push (@batch_sql_traf,$tSQL);
  251. #last found timestamp
  252. $tSQL="UPDATE user_auth SET last_found=$flow_date WHERE id='$auth_id'";
  253. push (@batch_sql_traf,$tSQL);
  254. #hour stats
  255. # get current stats
  256. my $sql = "SELECT id, byte_in, byte_out FROM user_stats WHERE ts>=$hour_date1 AND ts<$hour_date2 AND router_id=$router_id AND auth_id=$auth_id";
  257. my $hour_stat = get_record_sql($m_dbh,$sql);
  258. if (!$hour_stat) {
  259. my $dSQL="INSERT INTO user_stats (ts,auth_id,router_id,byte_in,byte_out) VALUES($flow_date,'$auth_id','$router_id','$user_stats{$user_ip}{in}','$user_stats{$user_ip}{out}')";
  260. push (@batch_sql_traf,$dSQL);
  261. next;
  262. }
  263. if (!$hour_stat->{byte_in}) { $hour_stat->{byte_in}=0; }
  264. if (!$hour_stat->{byte_out}) { $hour_stat->{byte_out}=0; }
  265. $hour_stat->{byte_in} += $user_stats{$user_ip}{in};
  266. $hour_stat->{byte_out} += $user_stats{$user_ip}{out};
  267. $tSQL="UPDATE user_stats SET byte_in='".$hour_stat->{byte_in}."', byte_out='".$hour_stat->{byte_out}."' WHERE id='".$auth_id."' AND router_id='".$router_id."'";
  268. push (@batch_sql_traf,$tSQL);
  269. }
  270. batch_db_sql($m_dbh,\@batch_sql_traf);
  271. db_log_debug($m_dbh,"Import traffic from router id: $router_id stop") if ($debug);
  272. db_log_verbose($m_dbh,"Recalc quotes started");
  273. recalc_quotes($m_dbh,$router_id);
  274. db_log_verbose($m_dbh,"Recalc quotes stopped");
  275. db_log_verbose($m_dbh,"router id: $router_id stop Traffic statistics, lines: all => $stats{line}{all}, user=> $stats{line}{user}, free => $stats{line}{free}, illegal=> $stats{line}{illegal}");
  276. db_log_verbose($m_dbh,sprintf("router id: %d stop Traffic speed, line/s: all => %.2f, user=> %.2f, free => %.2f, unknown=> %.2f", $router_id, $stats{line}{all}/$timeshift, $stats{line}{user}/$timeshift, $stats{line}{free}/$timeshift, $stats{line}{illegal}/$timeshift));
  277. db_log_verbose($m_dbh,"router id: $router_id stop Traffic statistics, pkt: all => $stats{pkt}{all}, user_in=> $stats{pkt}{user_in}, user_in=> $stats{pkt}{user_out}, free => $stats{pkt}{free}, illegal=> $stats{pkt}{illegal}");
  278. db_log_verbose($m_dbh,sprintf("router id: %d stop Traffic speed, pkt/s: all => %.2f, user_in=> %.2f, user_out=> %.2f, free => %.2f, unknown=> %.2f", $router_id, $stats{pkt}{all}/$timeshift, $stats{pkt}{user_in}/$timeshift, $stats{pkt}{user_out}/$timeshift, $stats{pkt}{free}/$timeshift, $stats{pkt}{illegal}/$timeshift));
  279. $m_dbh->disconnect;
  280. exit 0;