blazer.rb 4.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140
  1. require "csv"
  2. require "yaml"
  3. require "chartkick"
  4. require "safely/core"
  5. require "blazer/version"
  6. require "blazer/data_source"
  7. require "blazer/result"
  8. require "blazer/run_statement"
  9. require "blazer/adapters/base_adapter"
  10. require "blazer/adapters/elasticsearch_adapter"
  11. require "blazer/adapters/mongodb_adapter"
  12. require "blazer/adapters/sql_adapter"
  13. require "blazer/engine"
  14. module Blazer
  15. class Error < StandardError; end
  16. class TimeoutNotSupported < Error; end
  17. class << self
  18. attr_accessor :audit
  19. attr_reader :time_zone
  20. attr_accessor :user_name
  21. attr_accessor :user_class
  22. attr_accessor :user_method
  23. attr_accessor :before_action
  24. attr_accessor :from_email
  25. attr_accessor :cache
  26. attr_accessor :transform_statement
  27. attr_accessor :check_schedules
  28. attr_accessor :anomaly_checks
  29. attr_accessor :async
  30. attr_accessor :images
  31. end
  32. self.audit = true
  33. self.user_name = :name
  34. self.check_schedules = ["5 minutes", "1 hour", "1 day"]
  35. self.anomaly_checks = false
  36. self.async = false
  37. self.images = false
  38. TIMEOUT_MESSAGE = "Query timed out :("
  39. TIMEOUT_ERRORS = [
  40. "canceling statement due to statement timeout", # postgres
  41. "canceling statement due to conflict with recovery", # postgres
  42. "cancelled on user's request", # redshift
  43. "canceled on user's request", # redshift
  44. "system requested abort", # redshift
  45. "maximum statement execution time exceeded" # mysql
  46. ]
  47. BELONGS_TO_OPTIONAL = {}
  48. BELONGS_TO_OPTIONAL[:optional] = true if Rails::VERSION::MAJOR >= 5
  49. def self.time_zone=(time_zone)
  50. @time_zone = time_zone.is_a?(ActiveSupport::TimeZone) ? time_zone : ActiveSupport::TimeZone[time_zone.to_s]
  51. end
  52. def self.settings
  53. @settings ||= begin
  54. path = Rails.root.join("config", "blazer.yml").to_s
  55. if File.exist?(path)
  56. YAML.load(ERB.new(File.read(path)).result)
  57. else
  58. {}
  59. end
  60. end
  61. end
  62. def self.data_sources
  63. @data_sources ||= begin
  64. ds = Hash[
  65. settings["data_sources"].map do |id, s|
  66. [id, Blazer::DataSource.new(id, s)]
  67. end
  68. ]
  69. ds.default = ds.values.first
  70. ds
  71. end
  72. end
  73. def self.run_checks(schedule: nil)
  74. checks = Blazer::Check.includes(:query)
  75. checks = checks.where(schedule: schedule) if schedule
  76. checks.find_each do |check|
  77. next if check.state == "disabled"
  78. Safely.safely { run_check(check) }
  79. end
  80. end
  81. def self.run_check(check)
  82. rows = nil
  83. error = nil
  84. tries = 1
  85. ActiveSupport::Notifications.instrument("run_check.blazer", check_id: check.id, query_id: check.query.id, state_was: check.state) do |instrument|
  86. # try 3 times on timeout errors
  87. data_source = data_sources[check.query.data_source]
  88. statement = check.query.statement
  89. Blazer.transform_statement.call(data_source, statement) if Blazer.transform_statement
  90. while tries <= 3
  91. result = data_source.run_statement(statement, refresh_cache: true, check: check, query: check.query)
  92. if result.timed_out?
  93. Rails.logger.info "[blazer timeout] query=#{check.query.name}"
  94. tries += 1
  95. sleep(10)
  96. elsif result.error.to_s.start_with?("PG::ConnectionBad")
  97. data_source.reconnect
  98. Rails.logger.info "[blazer reconnect] query=#{check.query.name}"
  99. tries += 1
  100. sleep(10)
  101. else
  102. break
  103. end
  104. end
  105. check.update_state(result)
  106. # TODO use proper logfmt
  107. Rails.logger.info "[blazer check] query=#{check.query.name} state=#{check.state} rows=#{result.rows.try(:size)} error=#{result.error}"
  108. instrument[:statement] = statement
  109. instrument[:data_source] = data_source
  110. instrument[:state] = check.state
  111. instrument[:rows] = result.rows.try(:size)
  112. instrument[:error] = result.error
  113. instrument[:tries] = tries
  114. end
  115. end
  116. def self.send_failing_checks
  117. emails = {}
  118. Blazer::Check.includes(:query).where(state: ["failing", "error", "timed out", "disabled"]).find_each do |check|
  119. check.split_emails.each do |email|
  120. (emails[email] ||= []) << check
  121. end
  122. end
  123. emails.each do |email, checks|
  124. Blazer::CheckMailer.failing_checks(email, checks).deliver_later
  125. end
  126. end
  127. end