1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859 |
- module Blazer
- module Adapters
- class CassandraAdapter < BaseAdapter
- def run_statement(statement, comment)
- columns = []
- rows = []
- error = nil
- begin
- response = session.execute("#{statement} /*#{comment}*/")
- rows = response.map { |r| r.values }
- columns = rows.any? ? response.first.keys : []
- rescue => e
- error = e.message
- end
- [columns, rows, error]
- end
- def tables
- session.execute("SELECT table_name FROM system_schema.tables WHERE keyspace_name = '#{keyspace}'").map { |r| r["table_name"] }
- end
- def schema
- result = session.execute("SELECT keyspace_name, table_name, column_name, type, position FROM system_schema.columns WHERE keyspace_name = '#{keyspace}'")
- result.map(&:values).group_by { |r| [r[0], r[1]] }.map { |k, vs| {schema: k[0], table: k[1], columns: vs.sort_by { |v| v[2] }.map { |v| {name: v[2], data_type: v[3]} }} }
- end
- def preview_statement
- "SELECT * FROM {table} LIMIT 10"
- end
- private
- def cluster
- @cluster ||= begin
- require "cassandra"
- options = {hosts: [uri.host]}
- options[:port] = uri.port if uri.port
- options[:username] = uri.user if uri.user
- options[:password] = uri.password if uri.password
- ::Cassandra.cluster(options)
- end
- end
- def session
- @session ||= cluster.connect(keyspace)
- end
- def uri
- @uri ||= URI.parse(data_source.settings["url"])
- end
- def keyspace
- @keyspace ||= uri.path[1..-1]
- end
- end
- end
- end
|