|
@@ -1,3 +1,5 @@
|
|
|
+require "digest/md5"
|
|
|
+
|
|
|
module Blazer
|
|
|
class DataSource
|
|
|
attr_reader :id, :settings, :connection_model
|
|
@@ -34,48 +36,65 @@ module Blazer
|
|
|
settings["timeout"]
|
|
|
end
|
|
|
|
|
|
+ def cache
|
|
|
+ settings["cache"]
|
|
|
+ end
|
|
|
+
|
|
|
def use_transaction?
|
|
|
settings.key?("use_transaction") ? settings["use_transaction"] : true
|
|
|
end
|
|
|
|
|
|
def run_statement(statement, options = {})
|
|
|
- rows = []
|
|
|
+ rows = nil
|
|
|
error = nil
|
|
|
- comment = "blazer"
|
|
|
- if options[:user].respond_to?(:id)
|
|
|
- comment << ",user_id:#{options[:user].id}"
|
|
|
- end
|
|
|
- if options[:user].respond_to?(Blazer.user_name)
|
|
|
- # only include letters, numbers, and spaces to prevent injection
|
|
|
- comment << ",user_name:#{options[:user].send(Blazer.user_name).to_s.gsub(/[^a-zA-Z0-9 ]/, "")}"
|
|
|
- end
|
|
|
- if options[:query].respond_to?(:id)
|
|
|
- comment << ",query_id:#{options[:query].id}"
|
|
|
+ cached_at = nil
|
|
|
+ if cache
|
|
|
+ cache_key = ["blazer", "v2", id, Digest::MD5.hexdigest(statement)].join("/")
|
|
|
+ value = Blazer.cache.read(cache_key)
|
|
|
+ rows, cached_at = Marshal.load(value) if value
|
|
|
end
|
|
|
|
|
|
- in_transaction do
|
|
|
- begin
|
|
|
- connection_model.connection.execute("SET statement_timeout = #{timeout.to_i * 1000}") if timeout && postgresql?
|
|
|
- result = connection_model.connection.select_all("#{statement} /*#{comment}*/")
|
|
|
- result.each do |untyped_row|
|
|
|
- row = {}
|
|
|
- untyped_row.each do |k, v|
|
|
|
- row[k] = result.column_types.empty? ? v : result.column_types[k].send(:type_cast, v)
|
|
|
+ unless rows
|
|
|
+ rows = []
|
|
|
+
|
|
|
+ comment = "blazer"
|
|
|
+ if options[:user].respond_to?(:id)
|
|
|
+ comment << ",user_id:#{options[:user].id}"
|
|
|
+ end
|
|
|
+ if options[:user].respond_to?(Blazer.user_name)
|
|
|
+ # only include letters, numbers, and spaces to prevent injection
|
|
|
+ comment << ",user_name:#{options[:user].send(Blazer.user_name).to_s.gsub(/[^a-zA-Z0-9 ]/, "")}"
|
|
|
+ end
|
|
|
+ if options[:query].respond_to?(:id)
|
|
|
+ comment << ",query_id:#{options[:query].id}"
|
|
|
+ end
|
|
|
+
|
|
|
+ in_transaction do
|
|
|
+ begin
|
|
|
+ connection_model.connection.execute("SET statement_timeout = #{timeout.to_i * 1000}") if timeout && postgresql?
|
|
|
+ result = connection_model.connection.select_all("#{statement} /*#{comment}*/")
|
|
|
+ result.each do |untyped_row|
|
|
|
+ row = {}
|
|
|
+ untyped_row.each do |k, v|
|
|
|
+ row[k] = result.column_types.empty? ? v : result.column_types[k].send(:type_cast, v)
|
|
|
+ end
|
|
|
+ rows << row
|
|
|
end
|
|
|
- rows << row
|
|
|
+ rescue ActiveRecord::StatementInvalid => e
|
|
|
+ error = e.message.sub(/.+ERROR: /, "")
|
|
|
end
|
|
|
- rescue ActiveRecord::StatementInvalid => e
|
|
|
- error = e.message.sub(/.+ERROR: /, "")
|
|
|
end
|
|
|
+
|
|
|
+ Blazer.cache.write(cache_key, Marshal.dump([rows, Time.now]), expires_in: cache.to_f * 60) if !error && cache
|
|
|
end
|
|
|
|
|
|
- [rows, error]
|
|
|
+ [rows, error, cached_at]
|
|
|
end
|
|
|
|
|
|
def tables
|
|
|
default_schema = postgresql? ? "public" : connection_model.connection_config[:database]
|
|
|
schema = connection_model.connection_config[:schema] || default_schema
|
|
|
- rows, error = run_statement(connection_model.send(:sanitize_sql_array, ["SELECT table_name, column_name, ordinal_position, data_type FROM information_schema.columns WHERE table_schema = ?", schema]))
|
|
|
+ rows, error, cached_at = run_statement(connection_model.send(:sanitize_sql_array, ["SELECT table_name, column_name, ordinal_position, data_type FROM information_schema.columns WHERE table_schema = ?", schema]))
|
|
|
Hash[rows.group_by { |r| r["table_name"] }.map { |t, f| [t, f.sort_by { |f| f["ordinal_position"] }.map { |f| f.slice("column_name", "data_type") }] }.sort_by { |t, _f| t }]
|
|
|
end
|
|
|
|