data_source.rb 4.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196
  1. require "digest/md5"
  2. module Blazer
  3. class DataSource
  4. extend Forwardable
  5. attr_reader :id, :settings
  6. def_delegators :adapter_instance, :schema, :tables, :preview_statement, :reconnect, :cost, :explain, :cancel
  7. def initialize(id, settings)
  8. @id = id
  9. @settings = settings
  10. end
  11. def adapter
  12. settings["adapter"] || detect_adapter
  13. end
  14. def name
  15. settings["name"] || @id
  16. end
  17. def linked_columns
  18. settings["linked_columns"] || {}
  19. end
  20. def smart_columns
  21. settings["smart_columns"] || {}
  22. end
  23. def smart_variables
  24. settings["smart_variables"] || {}
  25. end
  26. def variable_defaults
  27. settings["variable_defaults"] || {}
  28. end
  29. def timeout
  30. settings["timeout"]
  31. end
  32. def cache
  33. @cache ||= begin
  34. if settings["cache"].is_a?(Hash)
  35. settings["cache"]
  36. elsif settings["cache"]
  37. {
  38. "mode" => "all",
  39. "expires_in" => settings["cache"]
  40. }
  41. else
  42. {
  43. "mode" => "off"
  44. }
  45. end
  46. end
  47. end
  48. def cache_mode
  49. cache["mode"]
  50. end
  51. def cache_expires_in
  52. (cache["expires_in"] || 60).to_f
  53. end
  54. def cache_slow_threshold
  55. (cache["slow_threshold"] || 15).to_f
  56. end
  57. def local_time_suffix
  58. @local_time_suffix ||= Array(settings["local_time_suffix"])
  59. end
  60. def read_cache(cache_key)
  61. value = Blazer.cache.read(cache_key)
  62. if value
  63. Blazer::Result.new(self, *Marshal.load(value), nil)
  64. end
  65. end
  66. def run_results(run_id)
  67. read_cache(run_cache_key(run_id))
  68. end
  69. def delete_results(run_id)
  70. Blazer.cache.delete(run_cache_key(run_id))
  71. end
  72. def run_statement(statement, options = {})
  73. run_id = options[:run_id]
  74. async = options[:async]
  75. result = nil
  76. if cache_mode != "off"
  77. if options[:refresh_cache]
  78. clear_cache(statement) # for checks
  79. else
  80. result = read_cache(statement_cache_key(statement))
  81. end
  82. end
  83. unless result
  84. comment = "blazer"
  85. if options[:user].respond_to?(:id)
  86. comment << ",user_id:#{options[:user].id}"
  87. end
  88. if options[:user].respond_to?(Blazer.user_name)
  89. # only include letters, numbers, and spaces to prevent injection
  90. comment << ",user_name:#{options[:user].send(Blazer.user_name).to_s.gsub(/[^a-zA-Z0-9 ]/, "")}"
  91. end
  92. if options[:query].respond_to?(:id)
  93. comment << ",query_id:#{options[:query].id}"
  94. end
  95. if options[:check]
  96. comment << ",check_id:#{options[:check].id},check_emails:#{options[:check].emails}"
  97. end
  98. if options[:run_id]
  99. comment << ",run_id:#{options[:run_id]}"
  100. end
  101. result = run_statement_helper(statement, comment, async ? options[:run_id] : nil)
  102. end
  103. result
  104. end
  105. def clear_cache(statement)
  106. Blazer.cache.delete(statement_cache_key(statement))
  107. end
  108. def cache_key(key)
  109. (["blazer", "v4"] + key).join("/")
  110. end
  111. def statement_cache_key(statement)
  112. cache_key(["statement", id, Digest::MD5.hexdigest(statement.to_s.gsub("\r\n", "\n"))])
  113. end
  114. def run_cache_key(run_id)
  115. cache_key(["run", run_id])
  116. end
  117. protected
  118. def adapter_instance
  119. @adapter_instance ||= begin
  120. unless settings["url"] || Rails.env.development? || ["bigquery", "athena"].include?(settings["adapter"])
  121. raise Blazer::Error, "Empty url for data source: #{id}"
  122. end
  123. unless Blazer.adapters[adapter]
  124. raise Blazer::Error, "Unknown adapter"
  125. end
  126. Blazer.adapters[adapter].new(self)
  127. end
  128. end
  129. def run_statement_helper(statement, comment, run_id)
  130. start_time = Time.now
  131. columns, rows, error = adapter_instance.run_statement(statement, comment)
  132. duration = Time.now - start_time
  133. cache_data = nil
  134. cache = !error && (cache_mode == "all" || (cache_mode == "slow" && duration >= cache_slow_threshold))
  135. if cache || run_id
  136. cache_data = Marshal.dump([columns, rows, error, cache ? Time.now : nil]) rescue nil
  137. end
  138. if cache && cache_data && adapter_instance.cachable?(statement)
  139. Blazer.cache.write(statement_cache_key(statement), cache_data, expires_in: cache_expires_in.to_f * 60)
  140. end
  141. if run_id
  142. unless cache_data
  143. error = "Error storing the results of this query :("
  144. cache_data = Marshal.dump([[], [], error, nil])
  145. end
  146. Blazer.cache.write(run_cache_key(run_id), cache_data, expires_in: 30.seconds)
  147. end
  148. Blazer::Result.new(self, columns, rows, error, nil, cache && !cache_data.nil?)
  149. end
  150. def detect_adapter
  151. schema = settings["url"].to_s.split("://").first
  152. case schema
  153. when "mongodb", "presto"
  154. schema
  155. else
  156. "sql"
  157. end
  158. end
  159. end
  160. end