|
from __future__ import print_function |
|
|
|
import itertools |
|
import logging |
|
import re |
|
import traceback |
|
|
|
from redash import models |
|
from redash.query_runner import BaseQueryRunner |
|
from redash.query_runner import register |
|
from redash.utils import json_dumps |
|
|
|
logger = logging.getLogger(__name__) |
|
|
|
PSEUDO_PACKAGE = ''' |
|
suppressPackageStartupMessages(library(jsonlite)) |
|
suppressPackageStartupMessages(library(lubridate)) |
|
suppressPackageStartupMessages(library(tidyverse)) |
|
|
|
options(deparse.max.lines=20, |
|
dplyr.show_progress=FALSE, |
|
encoding="UTF-8", |
|
error=dump.frames, |
|
keep.source=TRUE, |
|
max.print=1000, |
|
readr.show_progress=FALSE, |
|
scipen=100, |
|
show.error.locations=100, |
|
showErrorCalls=TRUE, |
|
showWarnCalls=TRUE, |
|
tidyverse.quiet=TRUE, |
|
warn=1, |
|
warnPartialMatchArgs=TRUE, |
|
warnPartialMatchAttr=TRUE, |
|
warnPartialMatchDollar=TRUE, |
|
width=10000) |
|
|
|
df_to_redash = function(data) { |
|
columns = lapply(colnames(data), get_redash_column, data=data) |
|
toJSON(list(columns=columns, rows=data), na="null", auto_unbox=TRUE) |
|
} |
|
|
|
get_redash_column = function(name, data) { |
|
list(name=name, friendly_name=name, type=get_redash_type(data[[name]])) |
|
} |
|
|
|
get_redash_type = function(x) { |
|
switch(class(x)[1], |
|
logical="boolean", |
|
integer="integer", |
|
numeric="float", |
|
character="string", |
|
POSIXct="datetime", |
|
POSIXlt="datetime", |
|
Date="date", |
|
"string") |
|
|
|
} |
|
|
|
redash_to_df = function(result) { |
|
if (length(result$rows) > 0) { |
|
data = mutate_if(result$rows, is.character, parse_guess) |
|
as_tibble(data)[,result$columns$name] |
|
} else { |
|
data = data.frame(matrix(ncol=length(result$columns$name), nrow=0)) |
|
setNames(data, result$columns$name) |
|
} |
|
} |
|
''' |
|
|
|
|
|
class PrintLog(object): |
|
|
|
def __init__(self): |
|
self.text = '' |
|
|
|
@property |
|
def lines(self): |
|
return self.text.splitlines() |
|
|
|
def write(self, text): |
|
self.text += text |
|
|
|
|
|
class R(BaseQueryRunner): |
|
|
|
should_annotate_query = False |
|
|
|
@classmethod |
|
def configuration_schema(cls): |
|
return {'type': 'object', 'properties': {}} |
|
|
|
@classmethod |
|
def enabled(cls): |
|
return True |
|
|
|
@classmethod |
|
def name(cls): |
|
return 'R' |
|
|
|
def __init__(self, configuration): |
|
BaseQueryRunner.__init__(self, configuration) |
|
self.counter = itertools.count(start=1) |
|
self.syntax = 'r' |
|
|
|
@staticmethod |
|
def execute_query(data_source_name, query): |
|
# Return results as a JSON string, compare Python.execute_query. |
|
data_source = models.DataSource.get_by_name(data_source_name) |
|
data, error = data_source.query_runner.run_query(query, None) |
|
if error is not None: |
|
raise Exception(error) |
|
return data |
|
|
|
@staticmethod |
|
def get_query_result(query_id): |
|
# Return results as a JSON string, compare Python.get_query_result. |
|
return models.Query.get_by_id(query_id).latest_query_data.data |
|
|
|
def test_connection(self): |
|
pass |
|
|
|
def strip_quotes(self, text): |
|
if text.startswith('"') and text.endswith('"'): return text[1:-1] |
|
if text.startswith("'") and text.endswith("'"): return text[1:-1] |
|
raise ValueError(repr(text)) |
|
|
|
def patch_execute_query(self, query, rinterface): |
|
# Run execute_query in Python, inject code that loads the result. |
|
flags = re.DOTALL | re.MULTILINE |
|
match = re.search(r'\bexecute_query\((.*?)\)$', query, flags=flags) |
|
if match is None: return query |
|
args = [x.strip() for x in match.group(1).split(",", 1)] |
|
data_source_name, subquery = [self.strip_quotes(x) for x in args] |
|
data = self.execute_query(data_source_name, subquery) |
|
name = 'JSON_{:d}'.format(next(self.counter)) |
|
rinterface.globalenv[name] = rinterface.StrSexpVector((data,)) |
|
code = 'fromJSON({}, simplifyVector=TRUE)'.format(name) |
|
return query[:match.start()] + code + query[match.end():] |
|
|
|
def patch_get_query_result(self, query, rinterface): |
|
# Run get_query_result in Python, inject code that loads the result. |
|
match = re.search(r'\bget_query_result\((.*?)\)', query) |
|
if match is None: return query |
|
data = self.get_query_result(int(match.group(1).strip())) |
|
name = 'JSON_{:d}'.format(next(self.counter)) |
|
rinterface.globalenv[name] = rinterface.StrSexpVector((data,)) |
|
code = 'fromJSON({}, simplifyVector=TRUE)'.format(name) |
|
return query[:match.start()] + code + query[match.end():] |
|
|
|
def run_query(self, query, user): |
|
from rpy2 import rinterface |
|
from rpy2 import robjects |
|
print_log = PrintLog() |
|
try: |
|
rinterface.initr() |
|
rinterface.set_writeconsole_regular(print_log.write) |
|
rinterface.set_writeconsole_warnerror(print_log.write) |
|
robjects.r('rm(list=ls())') |
|
robjects.r('gc(verbose=FALSE)') |
|
robjects.r(PSEUDO_PACKAGE) |
|
query = self.patch_execute_query(query, rinterface) |
|
query = self.patch_get_query_result(query, rinterface) |
|
robjects.r(query) |
|
result = robjects.r['result'][0] |
|
# Patch log into JSON string to avoid loading and dumping. |
|
log = json_dumps(print_log.lines[-1000:]) |
|
result = u'{}, "log": {}}}'.format(result.rstrip('}'), log) |
|
return result, None |
|
except Exception as e: |
|
logger.exception(e) |
|
traceback.print_exc(file=print_log) |
|
result = {'rows': [], 'columns': [], 'log': []} |
|
result['log'] = print_log.lines[-1000:] |
|
return json_dumps(result), None |
|
finally: |
|
robjects.r('rm(list=ls())') |
|
robjects.r('gc(verbose=FALSE)') |
|
|
|
|
|
register(R) |
Updated for Redash 8.