cassandra_adapter.rb 1.6 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859
  1. module Blazer
  2. module Adapters
  3. class CassandraAdapter < BaseAdapter
  4. def run_statement(statement, comment)
  5. columns = []
  6. rows = []
  7. error = nil
  8. begin
  9. response = session.execute("#{statement} /*#{comment}*/")
  10. rows = response.map { |r| r.values }
  11. columns = rows.any? ? response.first.keys : []
  12. rescue => e
  13. error = e.message
  14. end
  15. [columns, rows, error]
  16. end
  17. def tables
  18. session.execute("SELECT table_name FROM system_schema.tables WHERE keyspace_name = '#{keyspace}'").map { |r| r["table_name"] }
  19. end
  20. def schema
  21. result = session.execute("SELECT keyspace_name, table_name, column_name, type, position FROM system_schema.columns WHERE keyspace_name = '#{keyspace}'")
  22. result.map(&:values).group_by { |r| [r[0], r[1]] }.map { |k, vs| {schema: k[0], table: k[1], columns: vs.sort_by { |v| v[2] }.map { |v| {name: v[2], data_type: v[3]} }} }
  23. end
  24. def preview_statement
  25. "SELECT * FROM {table} LIMIT 10"
  26. end
  27. private
  28. def cluster
  29. @cluster ||= begin
  30. require "cassandra"
  31. options = {hosts: [uri.host]}
  32. options[:port] = uri.port if uri.port
  33. options[:username] = uri.user if uri.user
  34. options[:password] = uri.password if uri.password
  35. ::Cassandra.cluster(options)
  36. end
  37. end
  38. def session
  39. @session ||= cluster.connect(keyspace)
  40. end
  41. def uri
  42. @uri ||= URI.parse(data_source.settings["url"])
  43. end
  44. def keyspace
  45. @keyspace ||= uri.path[1..-1]
  46. end
  47. end
  48. end
  49. end