analysis.rb - warvox - VoIP based wardialing tool, forked from rapid7/warvox.
HTML git clone git://jay.scot/warvox
DIR Log
DIR Files
DIR Refs
DIR README
---
analysis.rb (10770B)
---
1 module WarVOX
2 module Jobs
3 class Analysis < Base
4
5 require 'fileutils'
6 require 'tempfile'
7 require 'open3'
8
9 # This is required by the verify_install.rb script, so dont error
10 # out if the gem is not yet available
11 begin
12 require 'kissfft'
13 rescue ::LoadError
14 end
15
16 class Classifier
17
18 class Completed < RuntimeError
19 end
20
21 attr_accessor :line_type
22 attr_accessor :signatures
23 attr_accessor :data
24
25 def initialize
26 @signatures = []
27 @data = {}
28 end
29
30 def proc(str)
31 begin
32 eval(str)
33 rescue Completed
34 end
35 end
36 end
37
38 def type
39 'analysis'
40 end
41
42 def initialize(job_id, conf)
43 @job_id = job_id
44 @conf = conf
45 @tasks = []
46 @calls = []
47 end
48
49 def stop
50 @calls = []
51 @tasks.each do |t|
52 t.kill rescue nil
53 end
54 @tasks = []
55 end
56
57 def start
58
59 @calls = []
60
61 query = nil
62
63 ::ActiveRecord::Base.connection_pool.with_connection {
64
65 begin
66
67 job = Job.find(@job_id)
68 if not job
69 raise RuntimeError, "The parent job no longer exists"
70 end
71
72 case @conf[:scope]
73 when 'calls'
74 if @conf[:force]
75 query = {id: @conf[:target_ids], answered: true, busy: false}
76 else
77 query = {id: @conf[:target_ids], answered: true, busy: false, analysis_started_at: nil}
78 end
79 when 'job'
80 if @conf[:force]
81 query = {job_id: @conf[:target_id], answered: true, busy: false}
82 else
83 query = {job_id: @conf[:target_id], answered: true, busy: false, analysis_started_at: nil}
84 end
85 when 'project'
86 if @conf[:force]
87 query = {project_id: @conf[:target_id], answered: true, busy: false}
88 else
89 query = {project_id: @conf[:target_id], answered: true, busy: false, analysis_started_at: nil}
90 end
91 when 'global'
92 if @conf[:force]
93 query = {answered: true, busy: false}
94 else
95 query = {answered: true, busy: false, analysis_started_at: nil}
96 end
97 else
98 # Bail if we don't have a valid scope
99 return
100 end
101
102 # Build a list of call IDs, as find_each() gets confused if the DB changes mid-iteration
103 calls = Call.where(query).map{|c| c.id }
104
105 @total_calls = calls.length
106 @completed_calls = 0
107
108 max_threads = WarVOX::Config.analysis_threads
109 last_update = Time.now
110
111 while(calls.length > 0)
112 if @tasks.length < max_threads
113 @tasks << Thread.new(calls.shift, job.id) { |c,j| ::ActiveRecord::Base.connection_pool.with_connection { run_analyze_call(c,j) }}
114 else
115 clear_stale_tasks
116
117 # Update progress every 10 seconds or so
118 if Time.now.to_f - last_update.to_f > 10
119 update_progress((@completed_calls / @total_calls.to_f) * 100)
120 last_update = Time.now
121 end
122
123 clear_zombies
124 end
125 end
126
127 @tasks.map {|t| t.join }
128 clear_stale_tasks
129 clear_zombies
130
131 rescue ::Exception => e
132 WarVOX::Log.error("Exception: #{e.class} #{e} #{e.backtrace}")
133 end
134
135 }
136 end
137
138 def clear_stale_tasks
139 @tasks = @tasks.select{ |x| x.status }
140 IO.select(nil, nil, nil, 0.25)
141 end
142
143 def update_progress(pct)
144 ::ActiveRecord::Base.connection_pool.with_connection {
145 Job.where(id: @job_id).update_all(progress: pct)
146 }
147 end
148
149 def run_analyze_call(cid, jid)
150
151 dr = Call.includes(:job).where(id: cid).first
152 dr.analysis_started_at = Time.now
153 dr.analysis_job_id = jid
154 dr.save!
155
156 WarVOX::Log.debug("Worker processing audio for #{dr.number}...")
157
158 bin = File.join(WarVOX::Base, 'bin', 'analyze_result.rb')
159 tmp = Tempfile.new("Analysis")
160 begin
161
162 mr = dr.media
163 ::File.open(tmp.path, "wb") do |fd|
164 fd.write(mr.audio)
165 end
166
167 pfd = IO.popen("nice #{bin} '#{tmp.path}' '#{ dr.number.gsub(/[^0-9a-zA-Z\-\+]+/, '') }'")
168 out = Marshal.load(pfd.read) rescue nil
169 pfd.close
170
171 return if not out
172
173 mf = dr.media_fields
174 out.each_key do |k|
175 if mf.include?(k.to_s)
176 mr[k] = out[k]
177 else
178 dr[k] = out[k]
179 end
180 end
181
182 dr.analysis_completed_at = Time.now
183
184 rescue ::Interrupt
185 ensure
186 tmp.close
187 tmp.unlink
188 end
189
190 mr.save
191 dr.save
192
193 @completed_calls += 1
194 end
195
196 # Takes the raw file path as an argument, returns a hash
197 def self.analyze_call(input, num=nil)
198
199 return if not input
200 return if not File.exist?(input)
201
202 bname = File.expand_path(File.dirname(input))
203 num ||= File.basename(input)
204 res = {}
205
206 #
207 # Create the signature database
208 #
209 raw = WarVOX::Audio::Raw.from_file(input)
210 fft = KissFFT.fftr(8192, 8000, 1, raw.samples) || []
211
212 freq = raw.to_freq_sig_arr()
213
214 # Save the signature data
215 res[:fprint] = freq
216
217 #
218 # Create a raw decompressed file
219 #
220
221 # Decompress the audio file
222 rawfile = Tempfile.new("rawfile")
223 datfile = Tempfile.new("datfile")
224
225 # Data files for audio processing and signal graph
226 cnt = 0
227 rawfile.write(raw.samples.pack('v*'))
228 datfile.write(raw.samples.map{|val| cnt +=1; "#{cnt/8000.0} #{val}"}.join("\n"))
229 rawfile.flush
230 datfile.flush
231
232 # Data files for spectrum plotting
233 frefile = Tempfile.new("frefile")
234
235 # Calculate the peak frequencies for the sample
236 maxf = 0
237 maxp = 0
238 tones = {}
239 fft.each do |x|
240 rank = x.sort{|a,b| a[1].to_i <=> b[1].to_i }.reverse
241 rank[0..10].each do |t|
242 f = t[0].round
243 p = t[1].round
244 next if f == 0
245 next if p < 1
246 tones[ f ] ||= []
247 tones[ f ] << t
248 if(t[1] > maxp)
249 maxf = t[0]
250 maxp = t[1]
251 end
252 end
253 end
254
255 # Save the peak frequency
256 res[:peak_freq] = maxf
257
258 # Calculate average frequency and peaks over time
259 avg = {}
260 pks = []
261 pkz = []
262 fft.each do |slot|
263 pks << slot.sort{|a,b| a[1] <=> b[1] }.reverse[0]
264 pkz << slot.sort{|a,b| a[1] <=> b[1] }.reverse[0..9]
265 slot.each do |f|
266 avg[ f[0] ] ||= 0
267 avg[ f[0] ] += f[1]
268 end
269 end
270
271 # Save the peak frequencies over time
272 res[:peak_freq_data] = pks.map{|f| "#{f[0]}-#{f[1]}" }.join(" ")
273
274 # Generate the frequency file
275 avg.keys.sort.each do |k|
276 avg[k] = avg[k] / fft.length
277 frefile.write("#{k} #{avg[k]}\n")
278 end
279 frefile.flush
280
281 # Count significant frequencies across the sample
282 fcnt = {}
283 0.step(4000, 5) {|f| fcnt[f] = 0 }
284 pkz.each do |fb|
285 fb.each do |f|
286 fdx = ((f[0] / 5.0).round * 5.0).to_i
287 fcnt[fdx] += 0.1
288 end
289 end
290
291 #
292 # Classifier processing
293 #
294
295 sproc = Classifier.new
296 sproc.data =
297 {
298 raw: raw,
299 freq: freq,
300 fcnt: fcnt,
301 fft: fft,
302 pks: pks,
303 pkz: pkz,
304 maxf: maxf,
305 maxp: maxp
306 }
307
308 WarVOX::Config.classifiers_load.each do |sigfile|
309 begin
310 str = File.read(sigfile, File.size(sigfile))
311 sproc.proc(str)
312 rescue ::Exception => e
313 $stderr.puts "DEBUG: Caught exception in #{sigfile}: #{e} #{e.backtrace}"
314 end
315 break if sproc.line_type
316 end
317
318 # Save the guessed line type
319 res[:line_type] = sproc.line_type
320
321 png_big = Tempfile.new("big")
322 png_big_dots = Tempfile.new("bigdots")
323 png_big_freq = Tempfile.new("bigfreq")
324 png_sig = Tempfile.new("signal")
325 png_sig_freq = Tempfile.new("sigfreq")
326
327 # Plot samples to a graph
328 plotter = Tempfile.new("gnuplot")
329
330
331 plotter.puts("set autoscale")
332 plotter.puts("set yrange [-15000:15000]")
333 plotter.puts("set ylabel \"Signal\"")
334 plotter.puts("set xlabel \"Seconds\"")
335 plotter.puts("set terminal png medium size 640,480 transparent")
336 plotter.puts("set output \"#{png_big.path}\"")
337 plotter.puts("plot \"#{datfile.path}\" using 1:2 title \"#{num}\" with lines")
338 plotter.puts("set output \"#{png_big_dots.path}\"")
339 plotter.puts("plot \"#{datfile.path}\" using 1:2 title \"#{num}\" with dots")
340
341
342 plotter.puts("unset yrange")
343 plotter.puts("set autoscale")
344 plotter.puts("set xrange [0:4000]")
345 plotter.puts("set terminal png medium size 640,480 transparent")
346 plotter.puts("set ylabel \"Power\"")
347 plotter.puts("set xlabel \"Frequency\"")
348 plotter.puts("set output \"#{png_big_freq.path}\"")
349 plotter.puts("plot \"#{frefile.path}\" using 1:2 title \"#{num} - Peak #{maxf.round}hz\" with lines")
350
351
352 plotter.puts("unset xrange")
353 plotter.puts("set autoscale")
354 plotter.puts("set yrange [-15000:15000]")
355 plotter.puts("unset border")
356 plotter.puts("unset xtics")
357 plotter.puts("unset ytics")
358 plotter.puts("set ylabel \"\"")
359 plotter.puts("set xlabel \"\"")
360 plotter.puts("set terminal png small size 80,60 transparent")
361 plotter.puts("set format x ''")
362 plotter.puts("set format y ''")
363 plotter.puts("set output \"#{png_sig.path}\"")
364 plotter.puts("plot \"#{datfile.path}\" using 1:2 notitle with lines")
365
366 plotter.puts("unset yrange")
367 plotter.puts("set autoscale")
368 plotter.puts("set xrange [0:4000]")
369 plotter.puts("unset border")
370 plotter.puts("unset xtics")
371 plotter.puts("unset ytics")
372 plotter.puts("set ylabel \"\"")
373 plotter.puts("set xlabel \"\"")
374 plotter.puts("set terminal png small size 80,60 transparent")
375 plotter.puts("set format x ''")
376 plotter.puts("set format y ''")
377 plotter.puts("set output \"#{png_sig_freq.path}\"")
378 plotter.puts("plot \"#{frefile.path}\" using 1:2 notitle with lines")
379 plotter.flush
380
381 system("#{WarVOX::Config.tool_path('gnuplot')} #{plotter.path}")
382 File.unlink(plotter.path)
383 File.unlink(datfile.path)
384 File.unlink(frefile.path)
385 plotter.close
386 datfile.close
387 frefile.path
388
389 ::File.open(png_big.path, 'rb') { |fd| res[:png_big] = fd.read }
390 ::File.open(png_big_dots.path, 'rb') { |fd| res[:png_big_dots] = fd.read }
391 ::File.open(png_big_freq.path, 'rb') { |fd| res[:png_big_freq] = fd.read }
392 ::File.open(png_sig.path, 'rb') { |fd| res[:png_sig] = fd.read }
393 ::File.open(png_sig_freq.path, 'rb') { |fd| res[:png_sig_freq] = fd.read }
394
395 [png_big, png_big_dots, png_big_freq, png_sig, png_sig_freq ].map {|x| x.unlink; x.close }
396
397 tmp_wav = Tempfile.new("wav")
398 tmp_mp3 = Tempfile.new("mp3")
399
400 # Generate a WAV file from raw linear PCM
401 ::File.open(tmp_wav.path, "wb") do |fd|
402 fd.write(raw.to_wav)
403 end
404
405 # Default samples at 8k, bump it to 32k to get better quality
406 system("#{WarVOX::Config.tool_path('lame')} -b 32 #{tmp_wav.path} #{tmp_mp3.path} >/dev/null 2>&1")
407
408 File.unlink(rawfile.path)
409 rawfile.close
410
411 ::File.open(tmp_mp3.path, "rb") { |fd| res[:mp3] = fd.read }
412
413 [tmp_wav, tmp_mp3].map {|x| x.unlink; x.close }
414
415 clear_zombies()
416
417 res
418 end
419 end
420
421
422 end
423 end