result.rb 4.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168
  1. module Blazer
  2. class Result
  3. attr_reader :data_source, :columns, :rows, :error, :cached_at, :just_cached
  4. def initialize(data_source, columns, rows, error, cached_at, just_cached)
  5. @data_source = data_source
  6. @columns = columns
  7. @rows = rows
  8. @error = error
  9. @cached_at = cached_at
  10. @just_cached = just_cached
  11. end
  12. def timed_out?
  13. error == Blazer::TIMEOUT_MESSAGE
  14. end
  15. def cached?
  16. cached_at.present?
  17. end
  18. def boom
  19. @boom ||= begin
  20. boom = {}
  21. columns.each_with_index do |key, i|
  22. smart_columns_data_source =
  23. ([data_source] + Array(data_source.settings["inherit_smart_settings"]).map { |ds| Blazer.data_sources[ds] }).find { |ds| ds.smart_columns[key] }
  24. if smart_columns_data_source
  25. query = smart_columns_data_source.smart_columns[key]
  26. res =
  27. if query.is_a?(Hash)
  28. query
  29. else
  30. values = rows.map { |r| r[i] }.compact.uniq
  31. result = smart_columns_data_source.run_statement(ActiveRecord::Base.send(:sanitize_sql_array, [query.sub("{value}", "(?)"), values]))
  32. result.rows
  33. end
  34. boom[key] = Hash[res.map { |k, v| [k.nil? ? k : k.to_s, v] }]
  35. end
  36. end
  37. boom
  38. end
  39. end
  40. def column_types
  41. @column_types ||= begin
  42. columns.each_with_index.map do |k, i|
  43. v = (rows.find { |r| r[i] } || {})[i]
  44. if boom[k]
  45. "string"
  46. elsif v.is_a?(Numeric)
  47. "numeric"
  48. elsif v.is_a?(Time) || v.is_a?(Date)
  49. "time"
  50. elsif v.nil?
  51. nil
  52. else
  53. "string"
  54. end
  55. end
  56. end
  57. end
  58. def chart_type
  59. @chart_type ||= begin
  60. if column_types.compact.size >= 2 && column_types.compact == ["time"] + (column_types.compact.size - 1).times.map { "numeric" }
  61. "line"
  62. elsif column_types == ["time", "string", "numeric"]
  63. "line2"
  64. elsif column_types.compact.size >= 2 && column_types == ["string"] + (column_types.compact.size - 1).times.map { "numeric" }
  65. "bar"
  66. elsif column_types == ["string", "string", "numeric"]
  67. "bar2"
  68. end
  69. end
  70. end
  71. def detect_anomaly
  72. anomaly = nil
  73. message = nil
  74. if rows.empty?
  75. message = "No data"
  76. else
  77. if chart_type == "line" || chart_type == "line2"
  78. series = []
  79. if chart_type == "line"
  80. columns[1..-1].each_with_index.each do |k, i|
  81. series << {name: k, data: rows.map{ |r| [r[0], r[i + 1]] }}
  82. end
  83. else
  84. rows.group_by { |r| v = r[1]; (boom[columns[1]] || {})[v.to_s] || v }.each_with_index.map do |(name, v), i|
  85. series << {name: name, data: v.map { |v2| [v2[0], v2[2]] }}
  86. end
  87. end
  88. current_series = nil
  89. begin
  90. anomalies = []
  91. series.each do |s|
  92. current_series = s[:name]
  93. anomalies << s[:name] if anomaly?(s[:data])
  94. end
  95. anomaly = anomalies.any?
  96. if anomaly
  97. if anomalies.size == 1
  98. message = "Anomaly detected in #{anomalies.first}"
  99. else
  100. message = "Anomalies detected in #{anomalies.to_sentence}"
  101. end
  102. else
  103. message = "No anomalies detected"
  104. end
  105. rescue => e
  106. message = "#{current_series}: #{e.message}"
  107. raise e if Rails.env.development?
  108. end
  109. else
  110. message = "Bad format"
  111. end
  112. end
  113. [anomaly, message]
  114. end
  115. def anomaly?(series)
  116. series = series.reject { |v| v[0].nil? }.sort_by { |v| v[0] }
  117. csv_str =
  118. CSV.generate do |csv|
  119. csv << ["timestamp", "count"]
  120. series.each do |row|
  121. csv << row
  122. end
  123. end
  124. r_script = %x[which Rscript].chomp
  125. type = series.any? && series.last.first.to_time - series.first.first.to_time >= 2.weeks ? "ts" : "vec"
  126. args = [type, csv_str]
  127. raise "R not found" if r_script.empty?
  128. command = "#{r_script} --vanilla #{File.expand_path("../detect_anomalies.R", __FILE__)} #{args.map { |a| Shellwords.escape(a) }.join(" ")}"
  129. output = %x[#{command}]
  130. if output.empty?
  131. raise "Unknown R error"
  132. end
  133. rows = CSV.parse(output, headers: true)
  134. error = rows.first && rows.first["x"]
  135. raise error if error
  136. timestamps = []
  137. if type == "ts"
  138. rows.each do |row|
  139. timestamps << Time.parse(row["timestamp"])
  140. end
  141. timestamps.include?(series.last[0].to_time)
  142. else
  143. rows.each do |row|
  144. timestamps << row["index"].to_i
  145. end
  146. timestamps.include?(series.length)
  147. end
  148. end
  149. end
  150. end