data_source.rb 4.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193
  1. require "digest/md5"
  2. module Blazer
  3. class DataSource
  4. extend Forwardable
  5. attr_reader :id, :settings, :adapter, :adapter_instance
  6. def_delegators :adapter_instance, :schema, :tables, :preview_statement, :reconnect, :cost, :explain, :cancel
  7. def initialize(id, settings)
  8. @id = id
  9. @settings = settings
  10. unless settings["url"] || Rails.env.development? || settings["adapter"] == "bigquery"
  11. raise Blazer::Error, "Empty url for data source: #{id}"
  12. end
  13. @adapter_instance =
  14. if Blazer.adapters[adapter]
  15. Blazer.adapters[adapter].new(self)
  16. else
  17. raise Blazer::Error, "Unknown adapter"
  18. end
  19. end
  20. def adapter
  21. settings["adapter"] || detect_adapter
  22. end
  23. def name
  24. settings["name"] || @id
  25. end
  26. def linked_columns
  27. settings["linked_columns"] || {}
  28. end
  29. def smart_columns
  30. settings["smart_columns"] || {}
  31. end
  32. def smart_variables
  33. settings["smart_variables"] || {}
  34. end
  35. def variable_defaults
  36. settings["variable_defaults"] || {}
  37. end
  38. def timeout
  39. settings["timeout"]
  40. end
  41. def cache
  42. @cache ||= begin
  43. if settings["cache"].is_a?(Hash)
  44. settings["cache"]
  45. elsif settings["cache"]
  46. {
  47. "mode" => "all",
  48. "expires_in" => settings["cache"]
  49. }
  50. else
  51. {
  52. "mode" => "off"
  53. }
  54. end
  55. end
  56. end
  57. def cache_mode
  58. cache["mode"]
  59. end
  60. def cache_expires_in
  61. (cache["expires_in"] || 60).to_f
  62. end
  63. def cache_slow_threshold
  64. (cache["slow_threshold"] || 15).to_f
  65. end
  66. def local_time_suffix
  67. @local_time_suffix ||= Array(settings["local_time_suffix"])
  68. end
  69. def read_cache(cache_key)
  70. value = Blazer.cache.read(cache_key)
  71. if value
  72. Blazer::Result.new(self, *Marshal.load(value), nil)
  73. end
  74. end
  75. def run_results(run_id)
  76. read_cache(run_cache_key(run_id))
  77. end
  78. def delete_results(run_id)
  79. Blazer.cache.delete(run_cache_key(run_id))
  80. end
  81. def run_statement(statement, options = {})
  82. run_id = options[:run_id]
  83. async = options[:async]
  84. result = nil
  85. if cache_mode != "off"
  86. if options[:refresh_cache]
  87. clear_cache(statement) # for checks
  88. else
  89. result = read_cache(statement_cache_key(statement))
  90. end
  91. end
  92. unless result
  93. comment = "blazer"
  94. if options[:user].respond_to?(:id)
  95. comment << ",user_id:#{options[:user].id}"
  96. end
  97. if options[:user].respond_to?(Blazer.user_name)
  98. # only include letters, numbers, and spaces to prevent injection
  99. comment << ",user_name:#{options[:user].send(Blazer.user_name).to_s.gsub(/[^a-zA-Z0-9 ]/, "")}"
  100. end
  101. if options[:query].respond_to?(:id)
  102. comment << ",query_id:#{options[:query].id}"
  103. end
  104. if options[:check]
  105. comment << ",check_id:#{options[:check].id},check_emails:#{options[:check].emails}"
  106. end
  107. if options[:run_id]
  108. comment << ",run_id:#{options[:run_id]}"
  109. end
  110. result = run_statement_helper(statement, comment, async ? options[:run_id] : nil)
  111. end
  112. result
  113. end
  114. def clear_cache(statement)
  115. Blazer.cache.delete(statement_cache_key(statement))
  116. end
  117. def cache_key(key)
  118. (["blazer", "v4"] + key).join("/")
  119. end
  120. def statement_cache_key(statement)
  121. cache_key(["statement", id, Digest::MD5.hexdigest(statement.to_s.gsub("\r\n", "\n"))])
  122. end
  123. def run_cache_key(run_id)
  124. cache_key(["run", run_id])
  125. end
  126. protected
  127. def run_statement_helper(statement, comment, run_id)
  128. start_time = Time.now
  129. columns, rows, error = @adapter_instance.run_statement(statement, comment)
  130. duration = Time.now - start_time
  131. cache_data = nil
  132. cache = !error && (cache_mode == "all" || (cache_mode == "slow" && duration >= cache_slow_threshold))
  133. if cache || run_id
  134. cache_data = Marshal.dump([columns, rows, error, cache ? Time.now : nil]) rescue nil
  135. end
  136. if cache && cache_data && @adapter_instance.cachable?(statement)
  137. Blazer.cache.write(statement_cache_key(statement), cache_data, expires_in: cache_expires_in.to_f * 60)
  138. end
  139. if run_id
  140. unless cache_data
  141. error = "Error storing the results of this query :("
  142. cache_data = Marshal.dump([[], [], error, nil])
  143. end
  144. Blazer.cache.write(run_cache_key(run_id), cache_data, expires_in: 30.seconds)
  145. end
  146. Blazer::Result.new(self, columns, rows, error, nil, cache && !cache_data.nil?)
  147. end
  148. def detect_adapter
  149. schema = settings["url"].to_s.split("://").first
  150. case schema
  151. when "mongodb", "presto"
  152. schema
  153. else
  154. "sql"
  155. end
  156. end
  157. end
  158. end