data_source.rb 4.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196
  1. require "digest/md5"
  2. module Blazer
  3. class DataSource
  4. extend Forwardable
  5. attr_reader :id, :settings, :adapter, :adapter_instance
  6. def_delegators :adapter_instance, :schema, :tables, :preview_statement, :reconnect, :cost, :explain, :cancel
  7. def initialize(id, settings)
  8. @id = id
  9. @settings = settings
  10. unless settings["url"] || Rails.env.development?
  11. raise Blazer::Error, "Empty url"
  12. end
  13. @adapter_instance =
  14. case adapter
  15. when "elasticsearch"
  16. Blazer::Adapters::ElasticsearchAdapter.new(self)
  17. when "mongodb"
  18. Blazer::Adapters::MongodbAdapter.new(self)
  19. when "presto"
  20. Blazer::Adapters::PrestoAdapter.new(self)
  21. when "sql"
  22. Blazer::Adapters::SqlAdapter.new(self)
  23. else
  24. raise Blazer::Error, "Unknown adapter"
  25. end
  26. end
  27. def adapter
  28. settings["adapter"] || detect_adapter
  29. end
  30. def name
  31. settings["name"] || @id
  32. end
  33. def linked_columns
  34. settings["linked_columns"] || {}
  35. end
  36. def smart_columns
  37. settings["smart_columns"] || {}
  38. end
  39. def smart_variables
  40. settings["smart_variables"] || {}
  41. end
  42. def variable_defaults
  43. settings["variable_defaults"] || {}
  44. end
  45. def timeout
  46. settings["timeout"]
  47. end
  48. def cache
  49. @cache ||= begin
  50. if settings["cache"].is_a?(Hash)
  51. settings["cache"]
  52. elsif settings["cache"]
  53. {
  54. "mode" => "all",
  55. "expires_in" => settings["cache"]
  56. }
  57. else
  58. {
  59. "mode" => "off"
  60. }
  61. end
  62. end
  63. end
  64. def cache_mode
  65. cache["mode"]
  66. end
  67. def cache_expires_in
  68. (cache["expires_in"] || 60).to_f
  69. end
  70. def cache_slow_threshold
  71. (cache["slow_threshold"] || 15).to_f
  72. end
  73. def local_time_suffix
  74. @local_time_suffix ||= Array(settings["local_time_suffix"])
  75. end
  76. def read_cache(cache_key)
  77. value = Blazer.cache.read(cache_key)
  78. if value
  79. Blazer::Result.new(self, *Marshal.load(value), nil)
  80. end
  81. end
  82. def run_results(run_id)
  83. read_cache(run_cache_key(run_id))
  84. end
  85. def delete_results(run_id)
  86. Blazer.cache.delete(run_cache_key(run_id))
  87. end
  88. def run_statement(statement, options = {})
  89. run_id = options[:run_id]
  90. async = options[:async]
  91. result = nil
  92. if cache_mode != "off" && !options[:refresh_cache]
  93. result = read_cache(statement_cache_key(statement))
  94. end
  95. unless result
  96. comment = "blazer"
  97. if options[:user].respond_to?(:id)
  98. comment << ",user_id:#{options[:user].id}"
  99. end
  100. if options[:user].respond_to?(Blazer.user_name)
  101. # only include letters, numbers, and spaces to prevent injection
  102. comment << ",user_name:#{options[:user].send(Blazer.user_name).to_s.gsub(/[^a-zA-Z0-9 ]/, "")}"
  103. end
  104. if options[:query].respond_to?(:id)
  105. comment << ",query_id:#{options[:query].id}"
  106. end
  107. if options[:check]
  108. comment << ",check_id:#{options[:check].id},check_emails:#{options[:check].emails}"
  109. end
  110. if options[:run_id]
  111. comment << ",run_id:#{options[:run_id]}"
  112. end
  113. result = run_statement_helper(statement, comment, async ? options[:run_id] : nil)
  114. end
  115. result
  116. end
  117. def clear_cache(statement)
  118. Blazer.cache.delete(statement_cache_key(statement))
  119. end
  120. def cache_key(key)
  121. (["blazer", "v4"] + key).join("/")
  122. end
  123. def statement_cache_key(statement)
  124. cache_key(["statement", id, Digest::MD5.hexdigest(statement.to_s.gsub("\r\n", "\n"))])
  125. end
  126. def run_cache_key(run_id)
  127. cache_key(["run", run_id])
  128. end
  129. protected
  130. def run_statement_helper(statement, comment, run_id)
  131. start_time = Time.now
  132. columns, rows, error = @adapter_instance.run_statement(statement, comment)
  133. duration = Time.now - start_time
  134. cache_data = nil
  135. cache = !error && (cache_mode == "all" || (cache_mode == "slow" && duration >= cache_slow_threshold))
  136. if cache || run_id
  137. cache_data = Marshal.dump([columns, rows, error, cache ? Time.now : nil]) rescue nil
  138. end
  139. if cache && cache_data
  140. Blazer.cache.write(statement_cache_key(statement), cache_data, expires_in: cache_expires_in.to_f * 60)
  141. end
  142. if run_id
  143. unless cache_data
  144. error = "Error storing the results of this query :("
  145. cache_data = Marshal.dump([[], [], error, nil])
  146. end
  147. Blazer.cache.write(run_cache_key(run_id), cache_data, expires_in: 30.seconds)
  148. end
  149. Blazer::Result.new(self, columns, rows, error, nil, cache && !cache_data.nil?)
  150. end
  151. def detect_adapter
  152. schema = settings["url"].to_s.split("://").first
  153. case schema
  154. when "mongodb", "presto"
  155. schema
  156. else
  157. "sql"
  158. end
  159. end
  160. end
  161. end