module Blazer class DataSource attr_reader :id, :settings, :connection_model def initialize(id, settings) @id = id @settings = settings @connection_model = Class.new(Blazer::Connection) do def self.name "Blazer::Connection::#{object_id}" end establish_connection(settings["url"]) if settings["url"] end end def name settings["name"] || @id end def linked_columns settings["linked_columns"] || {} end def smart_columns settings["smart_columns"] || {} end def smart_variables settings["smart_variables"] || {} end def timeout settings["timeout"] end def run_statement(statement, options = {}) rows = [] error = nil connection_model.transaction do begin connection_model.connection.execute("SET statement_timeout = #{timeout.to_i * 1000}") if timeout && postgresql? comment = "blazer" if options[:user].respond_to?(:id) comment << ",user_id:#{options[:user].id}" end if options[:query].respond_to?(:id) comment << ",query_id:#{options[:query].id}" end result = connection_model.connection.select_all("#{statement} /*#{comment}*/") result.each do |untyped_row| row = {} untyped_row.each do |k, v| row[k] = result.column_types.empty? ? v : result.column_types[k].send(:type_cast, v) end rows << row end rescue ActiveRecord::StatementInvalid => e error = e.message.sub(/.+ERROR: /, "") ensure raise ActiveRecord::Rollback end end [rows, error] end def tables default_schema = postgresql? ? "public" : connection_model.connection_config[:database] schema = connection_model.connection_config[:schema] || default_schema rows, error = run_statement(connection_model.send(:sanitize_sql_array, ["SELECT table_name, column_name, ordinal_position, data_type FROM information_schema.columns WHERE table_schema = ?", schema])) Hash[rows.group_by { |r| r["table_name"] }.map { |t, f| [t, f.sort_by { |f| f["ordinal_position"] }.map { |f| f.slice("column_name", "data_type") }] }.sort_by { |t, _f| t }] end def postgresql? ["PostgreSQL", "Redshift"].include?(connection_model.connection.adapter_name) end end end