parse_flow.pl 9.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290
  1. #!/usr/bin/perl
  2. #
  3. # Copyright (C) Roman Dmitiriev, rnd@rajven.ru
  4. #
  5. use FindBin '$Bin';
  6. use lib "$Bin/";
  7. use strict;
  8. use DBI;
  9. use Time::Local;
  10. use Net::Patricia;
  11. use Data::Dumper;
  12. use Date::Parse;
  13. use Socket;
  14. use Rstat::config;
  15. use Rstat::main;
  16. use Rstat::net_utils;
  17. use Rstat::mysql;
  18. setpriority(0,0,19);
  19. my $router_id;
  20. if (scalar @ARGV>1) { $router_id=shift(@ARGV); } else { $router_id=$ARGV[0]; }
  21. if (!$router_id) {
  22. db_log_error($dbh,"Router id not defined! Bye...");
  23. exit 110;
  24. }
  25. db_log_debug($dbh,"Import traffic from router id: $router_id start") if ($debug);
  26. # net objects
  27. my $users = new Net::Patricia;
  28. InitSubnets();
  29. my $dbt = init_traf_db();
  30. #get userid list
  31. my $user_auth_list = $dbh->prepare( "SELECT id,ip,user_id,save_traf FROM User_auth where deleted=0 ORDER by user_id,ip" );
  32. if ( !defined $user_auth_list ) { die "Cannot prepare statement: $DBI::errstr\n"; }
  33. $user_auth_list->execute;
  34. # user auth list
  35. my $authlist_ref = $user_auth_list->fetchall_arrayref();
  36. $user_auth_list->finish();
  37. my %user_stats;
  38. print "\nUser auth ip:\n" if ($debug);
  39. foreach my $row (@$authlist_ref) {
  40. $users->add_string($row->[1],$row->[0]);
  41. print "ip: $row->[1] auth_id: $row->[0]\n" if ($debug);
  42. $user_stats{$row->[0]}{net}=$row->[1];
  43. $user_stats{$row->[0]}{id}=$row->[0];
  44. $user_stats{$row->[0]}{user_id}=$row->[2];
  45. $user_stats{$row->[0]}{save_traf}=$row->[3];
  46. $user_stats{$row->[0]}{in}=0;
  47. $user_stats{$row->[0]}{out}=0;
  48. }
  49. my $last_time = localtime();
  50. my $time_string;
  51. my $dbtime;
  52. my $hour_date;
  53. my $minute_date;
  54. my @batch_sql_traf=();
  55. open(FH,"-");
  56. while (my $line=<FH>) {
  57. #1555573194.980;17 ; 77.243.0.12; 172.20.178.71; 53; 43432; 1; 134; 2; 1
  58. $line=~s/\s+//g;
  59. my ($l_time,$l_proto,$l_src_ip,$l_dst_ip,$l_src_port,$l_dst_port,$l_packets,$l_bytes,$l_in_dev,$l_out_dev) = split(/;/,$line);
  60. next if (!$l_time);
  61. next if ($l_src_ip eq '0.0.0.0');
  62. next if ($l_dst_ip eq '0.0.0.0');
  63. next if ($l_src_ip eq '255.255.255.255');
  64. next if ($l_dst_ip eq '255.255.255.255');
  65. next if ($Special_Nets->match_string($l_src_ip) or $Special_Nets->match_string($l_dst_ip));
  66. #unknown networks
  67. if (!$office_networks->match_string($l_src_ip) and !$office_networks->match_string($l_dst_ip)) {
  68. print "Unknown packet! src: $l_src_ip dst: $l_dst_ip in_dev: $l_in_dev out_dev: $l_out_dev\n";
  69. next;
  70. }
  71. #local forward
  72. if ($office_networks->match_string($l_src_ip) and $office_networks->match_string($l_dst_ip)) { next; }
  73. #free forward
  74. if ($office_networks->match_string($l_src_ip) and $free_networks->match_string($l_dst_ip)) { next; }
  75. if ($free_networks->match_string($l_src_ip) and $office_networks->match_string($l_dst_ip)) { next; }
  76. print "Flow: $line\n" if ($debug);
  77. my $l_src_ip_aton=StrToIp($l_src_ip);
  78. my $l_dst_ip_aton=StrToIp($l_dst_ip);
  79. #if ($l_time ne $l_time+0) { $l_time=time-600; }
  80. $last_time = $l_time;
  81. my ($sec,$min,$hour,$day,$month,$year,$zone) = (localtime($l_time))[0,1,2,3,4,5];
  82. $month++;
  83. $year += 1900;
  84. $time_string = sprintf "%04d-%02d-%02d %02d:%02d:%02d",$year,$month,$day,$hour,$min,$sec;
  85. $dbtime = $dbh->quote($time_string);
  86. $hour_date = $dbh->quote(sprintf "%04d-%02d-%02d %02d:00:00",$year,$month,$day,$hour);
  87. $minute_date = $dbh->quote(sprintf "%04d-%02d-%02d %02d:%02d:00",$year,$month,$day,$hour,$min);
  88. my $user_found = 0;
  89. # find user id
  90. if ($office_networks->match_string($l_src_ip)) {
  91. my $out_user = $users->match_string($l_src_ip);
  92. if ($out_user) {
  93. $user_stats{$out_user}{out} += $l_bytes;
  94. $user_stats{$out_user}{dbtime} = $minute_date;
  95. $user_stats{$out_user}{htime} = $hour_date;
  96. print "OUT: $out_user + $l_bytes sum: $user_stats{$out_user}{out}\n" if ($debug);
  97. $user_found = 1;
  98. if ($save_detail and $user_stats{$out_user}{save_traf}) {
  99. my $dSQL="INSERT INTO Traffic_detail (auth_id,router_id,timestamp,proto,src_ip,dst_ip,src_port,dst_port,bytes) VALUES($out_user,$router_id,$dbtime,'$l_proto',$l_src_ip_aton,$l_dst_ip_aton,'$l_src_port','$l_dst_port','$l_bytes')";
  100. push (@batch_sql_traf,$dSQL);
  101. }
  102. }
  103. }
  104. if ($office_networks->match_string($l_dst_ip)) {
  105. my $in_user = $users->match_string($l_dst_ip);
  106. if ($in_user) {
  107. $user_stats{$in_user}{in} += $l_bytes;
  108. $user_stats{$in_user}{dbtime} = $minute_date;
  109. $user_stats{$in_user}{htime} = $hour_date;
  110. print "IN: $in_user + $l_bytes sum: $user_stats{$in_user}{in}\n" if ($debug);
  111. $user_found = 1;
  112. if ($save_detail and $user_stats{$in_user}{save_traf}) {
  113. my $dSQL="INSERT INTO Traffic_detail (auth_id,router_id,timestamp,proto,src_ip,dst_ip,src_port,dst_port,bytes) VALUES($in_user,$router_id,$dbtime,'$l_proto',$l_src_ip_aton,$l_dst_ip_aton,'$l_src_port','$l_dst_port','$l_bytes')";
  114. push (@batch_sql_traf,$dSQL);
  115. }
  116. }
  117. }
  118. next if ($users->match_string($l_src_ip) or $users->match_string($l_dst_ip));
  119. next if (!$add_unknown_user);
  120. # find user ip
  121. my $user_ip;
  122. my $user_ip_aton;
  123. undef $user_ip;
  124. #add user by src ip only if dst not office network!!!!
  125. if (!$office_networks->match_string($l_dst_ip) and $office_networks->match_string($l_src_ip)) { $user_ip = $l_src_ip; }
  126. #skip unknown packet
  127. if (!$user_ip) { next; }
  128. $user_ip_aton=StrToIp($user_ip);
  129. db_log_warning($dbh,"New ip $user_ip added by netflow!");
  130. #default user
  131. my $new_user_id=get_new_user_id($dbh,$user_ip);
  132. my $insert_auth;
  133. $insert_auth->{ip}=$user_ip;
  134. $insert_auth->{ip_int}=$user_ip_aton;
  135. $insert_auth->{ip_int_end}=$user_ip_aton;
  136. $insert_auth->{user_id}=$new_user_id;
  137. $insert_auth->{enabled}="0";
  138. $insert_auth->{deleted}="0";
  139. $insert_auth->{save_traf}="$save_detail";
  140. insert_record($dbh,'User_auth',$insert_auth);
  141. my $sSQL="SELECT id,ip,user_id FROM User_auth where ip_int=\"$user_ip_aton\" and deleted=0";
  142. my $get_user_auth = $dbh->prepare($sSQL);
  143. if ( !defined $get_user_auth ) { die "Cannot prepare statement: $DBI::errstr\n"; }
  144. $get_user_auth->execute;
  145. # user auth list
  146. my $new_user = $get_user_auth->fetchall_arrayref();
  147. $get_user_auth->finish();
  148. my $auth_id;
  149. foreach my $row (@$new_user) {
  150. next if (!$row->[0]);
  151. $auth_id = $row->[0];
  152. $users->add_string($user_ip,$auth_id);
  153. $user_stats{$auth_id}{net}=$user_ip;
  154. $user_stats{$auth_id}{user_id}=$row->[2];
  155. $user_stats{$auth_id}{id}=$auth_id;
  156. $user_stats{$auth_id}{in}=0;
  157. $user_stats{$auth_id}{out}=0;
  158. db_log_info($dbh,"Added user_auth id: $auth_id ip: $user_ip user_id: $row->[2]");
  159. last;
  160. }
  161. print "ERROR add user_auth!\n" if (!$users->match_string($user_ip));
  162. if ($auth_id) {
  163. if ($save_detail) {
  164. my $dSQL="INSERT INTO Traffic_detail (auth_id,router_id,timestamp,proto,src_ip,dst_ip,src_port,dst_port,bytes) VALUES($auth_id,$router_id,$dbtime,'$l_proto',$l_src_ip_aton,$l_dst_ip_aton,'$l_src_port','$l_dst_port','$l_bytes')";
  165. push (@batch_sql_traf,$dSQL);
  166. }
  167. if ($l_src_ip eq $user_ip) {
  168. $user_stats{$auth_id}{out} += $l_bytes;
  169. }
  170. if ($l_dst_ip eq $user_ip) {
  171. $user_stats{$auth_id}{in} += $l_bytes;
  172. }
  173. $user_stats{$auth_id}{dbtime} = $minute_date;
  174. $user_stats{$auth_id}{htime} = $hour_date;
  175. } else {
  176. undef $user_ip;
  177. undef $user_ip_aton;
  178. }
  179. }
  180. my ($min,$hour,$day,$month,$year) = (localtime($last_time))[1,2,3,4,5];
  181. $month ++;
  182. $year += 1900;
  183. ######## user statistics
  184. print "Update traffic table...\n" if ($debug);
  185. # update database
  186. foreach my $row (keys %user_stats) {
  187. next if ($user_stats{$row}{in} + $user_stats{$row}{out} <= 0);
  188. # insert row
  189. my $statSQL="INSERT INTO User_traffic (timestamp,auth_id,router_id,byte_in,byte_out,byte_proxy) VALUES($user_stats{$row}{dbtime},$user_stats{$row}{id},$router_id,$user_stats{$row}{in},$user_stats{$row}{out},'0')";
  190. print "$statSQL\n" if ($debug);
  191. push (@batch_sql_traf,$statSQL);
  192. }
  193. ### hour stats
  194. # get current stats
  195. my $sql = "Select auth_id, SUM(byte_in),SUM(byte_out) from User_stats WHERE ((YEAR(timestamp)=$year) and (MONTH(timestamp)=$month) and (DAY(timestamp)=$day) and (HOUR(timestamp)=$hour) and router_id=$router_id) Group by auth_id order by auth_id";
  196. my $fth = $dbt->prepare($sql);
  197. $fth->execute;
  198. my $hour_stats=$fth->fetchall_arrayref();
  199. $fth->finish;
  200. # update database
  201. foreach my $row (keys %user_stats) {
  202. next if (!$user_stats{$row}{htime});
  203. my $found = 0;
  204. ### find current statistics
  205. foreach my $row2 (@$hour_stats) {
  206. my ($f_s,$f_id,$f_in,$f_out) = @$row2;
  207. if ($user_stats{$row}{id} eq $f_id) {
  208. $f_in += $user_stats{$row}{in};
  209. $f_out += $user_stats{$row}{out};
  210. $found = 1;
  211. my $ssql="UPDATE User_stats set byte_in='$f_in', byte_out='$f_out' WHERE (id=$f_s and router_id=$router_id)";
  212. my $res = $dbt->do($ssql);
  213. unless ($res) {
  214. my $dSQL="INSERT INTO User_stats (timestamp,auth_id,router_id,byte_in,byte_out) VALUES($user_stats{$row}{htime},'$user_stats{$row}{id}','$router_id','$f_in','$f_out')";
  215. push (@batch_sql_traf,$dSQL);
  216. }
  217. last;
  218. }
  219. }
  220. next if ($found);
  221. my $dSQL="INSERT INTO User_stats (timestamp,auth_id,router_id,byte_in,byte_out) VALUES($user_stats{$row}{htime},'$user_stats{$row}{id}','$router_id','$user_stats{$row}{in}','$user_stats{$row}{out}')";
  222. push (@batch_sql_traf,$dSQL);
  223. }
  224. $dbt->{AutoCommit} = 0;
  225. my $sth;
  226. foreach my $sSQL(@batch_sql_traf) {
  227. $sth = $dbt->prepare($sSQL);
  228. $sth->execute;
  229. }
  230. $sth->finish;
  231. $dbt->{AutoCommit} = 1;
  232. db_log_debug($dbh,"Import traffic from router id: $router_id stop") if ($debug);
  233. $dbt->disconnect;
  234. $dbh->disconnect;
  235. print "Done\n" if ($debug);
  236. exit 0;