diff --git a/aws-code-build/build_config.json b/aws-code-build/build_config.json index 4a06e104c..5396d7ef3 100644 --- a/aws-code-build/build_config.json +++ b/aws-code-build/build_config.json @@ -3,7 +3,6 @@ "base_branch": "master", "ignore": { "paths": [ - "python_client", ".gitignore", ".dockerignore", "LICENSE", @@ -17,7 +16,6 @@ "visualize_task_dependencies.sh", "udf-script-signature-generator", ".github", - "cpp_client", "emulator", "exaslct_scripts", "githooks" diff --git a/cpp_client/README.md b/cpp_client/README.md deleted file mode 100644 index d096badbf..000000000 --- a/cpp_client/README.md +++ /dev/null @@ -1,220 +0,0 @@ -# CPP Client - - -## Table of Contents -1. [Overview](#overview) -2. [Building the client](#building-the-client) -3. [Deploying the client](#deploying-the-client) -4. [Basic Example](#basic-example) -5. [Example of an aggregating UDF](#example-of-an-aggregating-udf) -6. [Example of an emitting UDF](#example-of-an-emitting-udf) - -## Overview -This script language client implements C++ as a -language for EXASOL. Users can provide C++ code in CREATE SCRIPT -statements. The code is compiled on the fly and then executed. - -## Building the client - -In order to build the client example code, the -best strategy is to locally start the very same Linux container that -is later used to run the client in in the EXASOL database. - -For this purpose, we suggest using Docker (https://www.docker.com). - -* First we need to get the Linux on the development machine. For this -purpose, we can either build one ourselfs from scratch like described -in the documentation about the EXASOL Linux container for script -languages, or we simply import the one the is installed in your version of EXASOL. -For instance, if you have EXASOL in a virtual machine with a local ip addree 192.168.56.104 on your development computer and have configured default BucketFS to listen to port 2580 via EXAoperation, the pre-installed Linux container can be imported as follows: - - -``` -docker import http://192.168.56.104:2580/default/EXAClusterOS/ScriptLanguages-2017-01-30.tar.gz mydockname -``` -Now we start the container and share the folder `src` like this: - -``` -docker run -v `pwd`/src:/src --name=mydockname -it mydockname /bin/bash -``` - -`src` is no mounted as `/src` inside the container: - -``` -root@d673f112aaca:~# cd /src -root@d673f112aaca:/src# ls -Makefile cpp.h scriptDTO.h scriptDTOWrapper.h zmqcontainer.proto -cpp.cpp scriptDTO.cc scriptDTOWrapper.cc swigcontainers.h zmqcontainerclient.cc -``` - -Typing `make` downloads some dependencies, builds the client and stores it in `cppclient.tar.gz`. -Note: there may also be some warning regarding the use of a deprecated feature in the jsoncpp library. - -## Deploying the client - -After building the client, exit docker and upload the client into BucketFS. Here we assume that you have created a bucket named `cpp` (with write-password `writepw`) in the default BucketFS: - -``` -curl -vX PUT -T src/cppclient.tar.gz http://w:writepw@192.168.56.104:2580/cpp/cppclient.tar.gz -``` - -Finally, in order to use C++ in SQL, you need to inform the SQL compiler about the new language. To do so in your current SQL session, you need to modify the session/system parameter SCRIPT_LANGUAGES, for instance like this: - -``` -alter session set script_languages = 'PYTHON=builtin_python R=builtin_r JAVA=builtin_java CPP=localzmq+protobuf:///bfsdefault/default/EXAClusterOS/ScriptLanguages-2017-01-30?lang=cpp#buckets/bfsdefault/cpp/cppclient/cppclient'; -``` - -Note: as we are using `alter session`, you need to re-issue the command above when you start a new session. -An alternative would be to use `alter system`. - -## Basic Example -Now C++ is available as script language: - -``` -create or replace cpp scalar script csin(x double) returns double as - -#include - -void run_cpp(SWIGMetadata& meta, SWIGTableIterator& iter, SWIGResultHandler& res) -{ - res.setDouble(0,sin(iter.getDouble(0))); - res.next(); -} - -/ - -select csin(32); -``` - - -### Example of an aggregating UDF -This example shows how to iterate over a incoming set of data by using -the `next()` method of the `SWIGTableIterator`. -Please note that C++ UDFs don't make a difference between "returning" and "emitting" values. -Both behaviors are achieved by setting a result value and then calling `next()` on the `SWIGResultHandler`. -For UDFs that return a value, use the column 0 to put the result value. - -``` -create or replace cpp set script sum_of_squares_cpp(x float) returns float as - -void run_cpp(SWIGMetadata& meta, SWIGTableIterator& iter, SWIGResultHandler& res) -{ - double acc = 0.0; - do { - double current = iter.getDouble(0); - acc += current*current; - } while (iter.next()); - res.setDouble(0,acc); - res.next(); -} -/ -``` - -Let's experiment a litte: - -``` -create or replace python scalar script fill(n int) emits (x float) as -def run(ctx): - for i in range(ctx.n): - ctx.emit(i) -/ - -create or replace table vals as select fill(1000000); - -select sum_of_squares_cpp(x) from vals; -``` -On my laptop, this took 2.3 seconds. - -Here is an alternative version as Python UDF: - -``` -create or replace python set script sum_of_squares_py(x float) returns float as -def run(ctx): - acc = 0.0 - while True: - acc += ctx.x*ctx.x - if not ctx.next(): break - return acc -/ - -select sum_of_squares_py(x) from vals; -``` -On my laptop, this took 5.6 seconds. - -Please note, that while the C++ performance is nice for a UDF, it is nowhere near what pure SQL can do on EXASOL: -``` -select sum(x*x) from vals; -``` -takes 0.1 seconds on my machine. - - -### Example of an emitting UDF -This example shows how to output multiple values for EMITS scripts. -Basically, this is achieved by calling the `next()` method of the `SWIGResultHandler` multiple times. - -``` -create or replace cpp set script duplicate_rows_cpp(x int) emits (x int) as - -void run_cpp(SWIGMetadata& meta, SWIGTableIterator& iter, SWIGResultHandler& res) -{ - do { - int64_t current = iter.getInt64(0); - res.setInt64(0,current); - res.next(); // first emit per input row - res.setInt64(0,current); - res.next(); // second emit per input row - } while (iter.next()); -} -/ - - -select duplicate_rows_cpp(x) from vals; -``` - -On my laptop, this took 2.7 seconds. - -A Python version could look like this: -``` -create or replace python set script duplicate_rows_py(x int) emits (x int) as -def run(ctx): - while True: - ctx.emit(ctx.x) - ctx.emit(ctx.x) - if not ctx.next(): break -/ - - -select duplicate_rows_py(x) from vals; -``` - -For me, this took 11.9 seconds. - -Here is a Lua version: -``` -create or replace lua set script duplicate_rows_lua(x int) emits (x int) as -run = function(ctx) - while true do - ctx.emit(ctx.x) - ctx.emit(ctx.x) - if not ctx.next() then break end - end -end -/ - -select duplicate_rows_lua(x) from vals; -``` - -Please note, that normally Lua scripts tend to be much faster in EXASOL than scripts in other languages, because Lua is tighter integrated with EXASOL. -Therefore it comes as quite a surprise that the Lua version, taking 2.3 seconds on my machine is only slightly faster than the C++ version. Even more so when considering that the current version of the C++ UDF language implementation compiles the C++ code on the fly which has a really large overhead. -Check this: -``` -select duplicate_rows_cpp(1); -``` -while - -``` -select duplicate_rows_py(1); -``` -only takes 0.1 seconds (as does the Lua version) - -So clearly, the overhead of compiling C++ is quite large at the moment! \ No newline at end of file diff --git a/cpp_client/cpp_udf.config b/cpp_client/cpp_udf.config deleted file mode 100644 index e0284f425..000000000 --- a/cpp_client/cpp_udf.config +++ /dev/null @@ -1,2 +0,0 @@ -// Add predefined macros for your project here. For example: -// #define THE_ANSWER 42 diff --git a/cpp_client/cpp_udf.creator b/cpp_client/cpp_udf.creator deleted file mode 100644 index e94cbbd30..000000000 --- a/cpp_client/cpp_udf.creator +++ /dev/null @@ -1 +0,0 @@ -[General] diff --git a/cpp_client/cpp_udf.creator.user b/cpp_client/cpp_udf.creator.user deleted file mode 100644 index 5e83587d2..000000000 --- a/cpp_client/cpp_udf.creator.user +++ /dev/null @@ -1,172 +0,0 @@ - - - - - - EnvironmentId - {d0b2782e-986a-48d5-bcf6-e6043ec5930b} - - - ProjectExplorer.Project.ActiveTarget - 0 - - - ProjectExplorer.Project.EditorSettings - - true - false - true - - Cpp - - CppGlobal - - - - QmlJS - - QmlJSGlobal - - - 2 - UTF-8 - false - 4 - false - 80 - true - true - 1 - true - false - 0 - true - 1 - 8 - true - 1 - true - true - true - false - - - - ProjectExplorer.Project.PluginSettings - - - - ProjectExplorer.Project.Target.0 - - Desktop - Desktop - {d90bcffd-228f-4590-b7f0-b77476077e8d} - 0 - 0 - 0 - - - - 0 - Build - - ProjectExplorer.BuildSteps.Build - - - 0 - Clean - - ProjectExplorer.BuildSteps.Clean - - 2 - false - - Default - Default - GenericProjectManager.GenericBuildConfiguration - - 1 - - - 0 - Deploy - - ProjectExplorer.BuildSteps.Deploy - - 1 - Deploy locally - - ProjectExplorer.DefaultDeployConfiguration - - 1 - - - false - 1000 - - true - - false - false - false - false - true - 0.01 - 10 - true - 1 - 25 - - 1 - true - false - true - valgrind - - 0 - 1 - 2 - 3 - 4 - 5 - 6 - 7 - 8 - 9 - 10 - 11 - 12 - 13 - 14 - - 2 - - - - %{buildDir} - Custom Executable - - ProjectExplorer.CustomExecutableRunConfiguration - 3768 - false - true - false - false - true - - 1 - - - - ProjectExplorer.Project.TargetCount - 1 - - - ProjectExplorer.Project.Updater.FileVersion - 18 - - - Version - 18 - - diff --git a/cpp_client/cpp_udf.files b/cpp_client/cpp_udf.files deleted file mode 100644 index 4c0caee04..000000000 --- a/cpp_client/cpp_udf.files +++ /dev/null @@ -1,9 +0,0 @@ -src/cpp.cpp -src/cpp.h -src/scriptDTO.cc -src/scriptDTO.h -src/scriptDTOWrapper.cc -src/scriptDTOWrapper.h -src/swigcontainers.h -src/zmqcontainer.proto -src/zmqcontainerclient.cc diff --git a/cpp_client/cpp_udf.includes b/cpp_client/cpp_udf.includes deleted file mode 100644 index 0f865a71c..000000000 --- a/cpp_client/cpp_udf.includes +++ /dev/null @@ -1,2 +0,0 @@ -src -src/old diff --git a/cpp_client/src/Makefile b/cpp_client/src/Makefile deleted file mode 100644 index 6209b6acf..000000000 --- a/cpp_client/src/Makefile +++ /dev/null @@ -1,41 +0,0 @@ -#!/bin/sh - -CXXFLAGS=-O3 -I. -Wall -fPIC -pthread -DNDEBUG -std=c++11 -DENABLE_CPP_VM -DEXTERNAL_PROCESS -DBUILDINSWIGDIR - -LDFLAGS=-rdynamic -lzmq -lprotobuf -lpthread -lcrypto -ldl - -all: cppclient.tar.gz - - -jsoncpp-master.zip: - wget -N https://github.com/open-source-parsers/jsoncpp/archive/master.zip -O jsoncpp-master.zip - -jsoncpp-master: jsoncpp-master.zip - unzip jsoncpp-master.zip - cd jsoncpp-master && python amalgamate.py - -zmqcontainer.pb.cc: zmqcontainer.proto - protoc -I. $< --cpp_out=. - -swigcontainers_ext.h: swigcontainers.h - cpp -DEXTERNAL_PROCESS swigcontainers.h | sed 's/^\$$/#/; /^# *[0-9] */d; /^ *$$/d' > "swigcontainers_ext.h" - - -%.o: %.cc zmqcontainer.pb.cc swigcontainers_ext.h scriptDTOWrapper.h scriptDTO.h - g++ -c $(CXXFLAGS) -o $@ $< -DBUILDINSWIGDIR - -%.o: %.c - g++ -c $(CXXFLAGS) -o $@ $< - - -cppclient: jsoncpp-master zmqcontainerclient.o zmqcontainer.pb.o jsoncpp-master/dist/jsoncpp.o swigcontainers_ext.h scriptDTOWrapper.h scriptDTO.o scriptDTOWrapper.o cpp.o - g++ -o cppclient zmqcontainerclient.o zmqcontainer.pb.o jsoncpp-master/dist/jsoncpp.o scriptDTO.o scriptDTOWrapper.o cpp.o $(LDFLAGS) - -cppclient.tar.gz: cppclient jsoncpp-master - tar --transform 's/.*\///g' -zcf cppclient.tar.gz cppclient swigcontainers_ext.h scriptDTOWrapper.h scriptDTO.h zmqcontainer.pb.h jsoncpp-master/dist/json/json.h jsoncpp-master/dist/json/json-forwards.h - -.PHONY: clean -clean: - @rm -f *.o *~ - @rm -f cppclient cppclient.tar.gz zmqcontainer.pb.cc zmqcontainer.pb.h swigcontainers_ext.h - @rm -rf jsoncpp-master jsoncpp-master.zip diff --git a/cpp_client/src/cpp.cpp b/cpp_client/src/cpp.cpp deleted file mode 100644 index 9da5a21b9..000000000 --- a/cpp_client/src/cpp.cpp +++ /dev/null @@ -1,313 +0,0 @@ -#include "cpp.h" -#include -#include -#include -#include -#include "swigcontainers_ext.h" -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include - -namespace SWIGVMContainers { -std::string getExecutablePath() -{ - char buf[PATH_MAX+1]; - ssize_t count = readlink("/proc/self/exe", buf, PATH_MAX); - if (count>0) - { - buf[count] = '\0'; - return std::string(buf); - } - abort(); -} - -bool mexec(const std::string& cmd_, std::string& result) { - char buffer[128]; - std::stringstream cmd; - cmd << "ulimit -v 500000; "; - cmd << cmd_ << " 2>&1"; - - FILE* pipe = popen(cmd.str().c_str(), "r"); - if (!pipe) { - result = "Cannot start command `" + cmd.str() + "`"; - return false; - } - while (!feof(pipe)) { - if (fgets(buffer, 128, pipe) != NULL) { - result += buffer; - } - } - int s = pclose(pipe); - if (s == -1) - { - return false; - } - if (WEXITSTATUS(s)) - { - return false; - } - return true; -} - -std::string CPPVM::getOptionLine(std::string scriptCode, const std::string option, const std::string whitespace, const std::string lineEnd, size_t& pos) { - std::string result; - size_t startPos = scriptCode.find(option); - if (startPos != std::string::npos) { - size_t firstPos = startPos + option.length(); - firstPos = scriptCode.find_first_not_of(whitespace, firstPos); - if (firstPos == std::string::npos) { - std::stringstream ss; - ss << "No values found for " << option << " statement"; - throw exception(ss.str().c_str()); - } - size_t lastPos = scriptCode.find_first_of(lineEnd + "\r\n", firstPos); - if (lastPos == std::string::npos || scriptCode.compare(lastPos, lineEnd.length(), lineEnd) != 0) { - std::stringstream ss; - ss << "End of " << option << " statement not found"; - throw exception(ss.str().c_str()); - } - if (firstPos >= lastPos) { - std::stringstream ss; - ss << "No values found for " << option << " statement"; - throw exception(ss.str().c_str()); - } - size_t optionsEnd = scriptCode.find_last_not_of(whitespace, lastPos - 1); - if (optionsEnd == std::string::npos || optionsEnd < firstPos) { - std::stringstream ss; - ss << "No values found for " << option << " statement"; - throw exception(ss.str().c_str()); - } - result = scriptCode.substr(firstPos, optionsEnd - firstPos + 1); - scriptCode.erase(startPos, lastPos - startPos + 1); - } - pos = startPos; - return result; -} - -std::vector CPPVM::scriptToMd5(const char *script) { - MD5_CTX ctx; - unsigned char md5[MD5_DIGEST_LENGTH]; - MD5_Init(&ctx); - MD5_Update(&ctx, script, strlen(script)); - MD5_Final(md5, &ctx); - return std::vector(md5, md5 + sizeof(md5)); -} - -void CPPVM::importScripts() { - - const std::string whitespace = " \t\f\v"; - const std::string lineEnd = ";"; - size_t pos; - - // Attention: We must hash the parent script before modifying it (adding the - // package definition). Otherwise we don't recognize if the script imports itself - m_importedScriptChecksums.insert(scriptToMd5(meta.scriptCode())); - while (true) { - std::string scriptName = getOptionLine(meta.scriptCode(), "%import", whitespace, lineEnd, pos); - if (scriptName == "") - break; - - const char *scriptCode = meta.moduleContent(scriptName.c_str()); - const char *exception = meta.checkException(); - if (exception) - throw SWIGVM::exception(exception); - if (m_importedScriptChecksums.insert(scriptToMd5(scriptCode)).second) { - // Script has not been imported yet - // If this imported script contains %import statements - // they will be resolved in this while loop. - m_script_code.insert(pos, scriptCode); - } - } -} - - - -CPPVM::CPPVM(bool) - : - m_importedScriptChecksums(), - m_script_code(SWIGVM_params->script_code) -{ - std::string myPath = getExecutablePath(); - std::string myFolder = myPath.substr(0,myPath.find_last_of('/')); - { - std::stringstream cmd; - cmd << "cp " << myFolder << "/*.h /tmp/"; - - if (::system(cmd.str().c_str())) - { - std::cerr << "Some error when copying header file" << std::endl; - std::cerr << "current dir: " << std::endl; - if (system("pwd")) {} - abort(); - } - } - importScripts(); - const std::string whitespace = " \t\f\v"; - const std::string lineEnd = ";"; - size_t nextOptionPos = 0; - - std::string LDFLAGS = getOptionLine(meta.scriptCode(),"%compilerflags",whitespace,lineEnd,nextOptionPos); - - std::ofstream out("/tmp/code.cc"); - out << "#include \"swigcontainers_ext.h\"" << std::endl; - out << "#include \"scriptDTOWrapper.h\"" << std::endl; - out << "using namespace SWIGVMContainers;\n" << std::endl; - - out << m_script_code << std::endl; - out.close(); - - { - std::stringstream cmd; - cmd << "g++ -O3 -shared -fPIC -o /tmp/libcode.so /tmp/code.cc -DBUILDINSWIGDIR"; - cmd << " -I" << myFolder; - cmd << " " << LDFLAGS; - - std::string msg; - if (!mexec(cmd.str(), msg)) - { - - throw exception(("Error when compiling script code:\n"+cmd.str()+"\n\n"+msg).c_str()); - } - } - -// enable to retrieve function signatures from the EXASOL log file -#if 0 - { - if (::system("nm /tmp/libcode.so")) {} - } -#endif - handle = dlopen("/tmp/libcode.so",RTLD_NOW); - - if (handle == NULL) - { - throw exception( dlerror() ); - }; - - -} - -void CPPVM::shutdown() -{ -} - -bool CPPVM::run() -{ - if (SWIGVM_params->singleCallMode) - { - throw exception("calling RUN in single call mode"); - } - - - if (!run_cpp) { - char *error; - run_cpp = (RUN_FUNC)dlsym(handle, "_Z7run_cppRN16SWIGVMContainers12SWIGMetadataERNS_17SWIGTableIteratorERNS_17SWIGResultHandlerE"); - if ((error = dlerror()) != NULL) { - std::stringstream sb; - sb << "Error when trying to load function \"run_cpp\": " << std::endl << error; - throw exception(sb.str().c_str()); - } - } - - SWIGTableIterator iter; - SWIGResultHandler res(&iter); - - if (meta.inputType()==MULTIPLE) { - // please note: at the moment, SET-RETURNS and SET-EMITS call the same - // C++ function. - if (meta.outputType()==EXACTLY_ONCE) { - // SET-RETURNS - (*run_cpp)(meta,iter,res); - } else { - // SET-EMITS - (*run_cpp)(meta,iter,res); - } - } else { - // please note: at the moment, SCALAR-RETURNS and SCALAR-EMITS call the same - // C++ function. - if (meta.outputType()==EXACTLY_ONCE) { - // SCALAR-RETURNS - while (true) { - (*run_cpp)(meta,iter,res); - if (!iter.next()) break; - } - } else { - // SCALAR-EMITS - while (true) { - (*run_cpp)(meta,iter,res); - if (!iter.next()) break; - } - } - } - - res.flush(); - - - return true; -} - -std::string CPPVM::singleCall(single_call_function_id fn, const ExecutionGraph::ScriptDTO& args) -{ - DEFAULT_OUTPUT_COLUMNS_FUNC defaultOutputColumnsFunc = NULL; - ADAPTER_CALL_FUNC adapterCallFunc = NULL; - IMPORT_ALIAS_FUNC importAliasFunc = NULL; - ExecutionGraph::StringDTO* stringDTO = NULL; - ExecutionGraph::ImportSpecification* importDTO = NULL; - char *error = NULL; - switch (fn) - { - case SC_FN_DEFAULT_OUTPUT_COLUMNS: - defaultOutputColumnsFunc = (DEFAULT_OUTPUT_COLUMNS_FUNC)dlsym(handle, "_Z23getDefaultOutputColumnsB5cxx11RKN9UDFClient8MetadataE"); - if ((error = dlerror()) != NULL) - { - std::stringstream sb; - sb << "Error when trying to load singleCall function: " << std::endl << error; - throw exception(sb.str().c_str()); - } - return (*defaultOutputColumnsFunc)(meta); - break; - case SC_FN_VIRTUAL_SCHEMA_ADAPTER_CALL: - adapterCallFunc = (ADAPTER_CALL_FUNC)dlsym(handle, "_Z11adapterCallRKN9UDFClient8MetadataENSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEE"); - if ((error = dlerror()) != NULL) - { - std::stringstream sb; - sb << "Error when trying to load singleCall function: " << std::endl << error; - throw exception(sb.str().c_str()); - } - stringDTO = (ExecutionGraph::StringDTO*)&args; - assert(stringDTO != NULL); - return (*adapterCallFunc)(meta,stringDTO->getArg()); - break; - case SC_FN_GENERATE_SQL_FOR_IMPORT_SPEC: - importAliasFunc = (IMPORT_ALIAS_FUNC)dlsym(handle,"_Z24generateSqlForImportSpecB5cxx11RKN9UDFClient8MetadataERKNS_19ImportSpecificationE"); - if ((error = dlerror()) != NULL) - { - std::stringstream sb; - sb << "Error when trying to load singleCall function: " << std::endl << error; - throw exception(sb.str().c_str()); - } - importDTO = (ExecutionGraph::ImportSpecification*)&args; - assert(importDTO != NULL); - return (*importAliasFunc)(meta,*importDTO); - break; - default: - { - std::stringstream sb; - sb << "Unsupported singleCall function id: " << fn; - throw exception(sb.str().c_str()); - } - } - - return "dummy"; -} - - - -} diff --git a/cpp_client/src/cpp.h b/cpp_client/src/cpp.h deleted file mode 100644 index 2cbd45636..000000000 --- a/cpp_client/src/cpp.h +++ /dev/null @@ -1,47 +0,0 @@ -#ifndef CPP_H -#define CPP_H - -#include "swigcontainers_ext.h" -#include -#include -#include -#include - -namespace SWIGVMContainers { -class CPPVM: public SWIGVM { - - typedef void (*RUN_FUNC)(const SWIGVMContainers::SWIGMetadata&, SWIGVMContainers::SWIGTableIterator&, SWIGVMContainers::SWIGResultHandler&); - - typedef std::string (*DEFAULT_OUTPUT_COLUMNS_FUNC)(const SWIGVMContainers::SWIGMetadata&); - typedef std::string (*ADAPTER_CALL_FUNC)(const SWIGVMContainers::SWIGMetadata&, const std::string input); - typedef std::string (*IMPORT_ALIAS_FUNC)(const SWIGVMContainers::SWIGMetadata&, const ExecutionGraph::ImportSpecification& importSpecification); - typedef void (*CLEANUP_FUNC)(); - - RUN_FUNC run_cpp=NULL; - - public: - struct exception: SWIGVM::exception { - exception(const char *reason): SWIGVM::exception(reason) { } - virtual ~exception() throw() { } - }; - - CPPVM(bool checkOnly); - virtual ~CPPVM() {} - virtual void shutdown(); - virtual bool run(); - virtual std::string singleCall(single_call_function_id fn, const ExecutionGraph::ScriptDTO& args); -private: - - void importScripts(); - std::vector scriptToMd5(const char *script); - std::string getOptionLine(std::string scriptCode, const std::string option, const std::string whitespace, const std::string lineEnd, size_t& pos); - - SWIGMetadata meta; - - std::set< std::vector > m_importedScriptChecksums; - - std::string m_script_code; - void* handle; -}; -} -#endif // CPP_H diff --git a/cpp_client/src/scriptDTO.cc b/cpp_client/src/scriptDTO.cc deleted file mode 100644 index 203750009..000000000 --- a/cpp_client/src/scriptDTO.cc +++ /dev/null @@ -1,310 +0,0 @@ -#ifdef BUILDINSWIGDIR -#include "scriptDTO.h" -#else -#include -#endif -#include -#include - -namespace ExecutionGraph -{ - -////////////////////////////// -EmptyDTO::EmptyDTO() -{} - -void EmptyDTO::accept(ScriptDTOSerializer& serializer) const -{ - return; -} - -bool EmptyDTO::isEmpty() const -{ - return true; -} - -////////////////////////////// - - -void StringDTO::accept(ScriptDTOSerializer& serializer) const -{ - serializer.visit(*this); -} - -StringDTO::StringDTO() - : arg("") -{} - -StringDTO::StringDTO(const std::string& arg_) - : arg(arg_) -{} - - -const std::string StringDTO::getArg() const -{ - return arg; -} - -bool StringDTO::isEmpty() const -{ - return false; -} - -////////////////////////////// - - - -void ConnectionInformation::accept(ScriptDTOSerializer& serializer) const -{ - serializer.visit(*this); -} - -ConnectionInformation::ConnectionInformation(const std::string& kind_, const std::string& address_, const std::string& user_, const std::string& password_) - : kind(kind_), - address(address_), - user(user_), - password(password_) -{ -} - -ConnectionInformation::ConnectionInformation(const std::string& address_, const std::string& user_, const std::string& password_) - : kind("password"), - address(address_), - user(user_), - password(password_) -{} - - -ConnectionInformation::ConnectionInformation() - : kind(""), address(""), user(""), password("") -{} - -ConnectionInformation::ConnectionInformation(const ConnectionInformation& other) - : kind(other.getKind()), - address(other.getAddress()), - user(other.getUser()), - password(other.getPassword()) -{} - -const std::string ConnectionInformation::getKind() const -{ - return kind; -} - -const std::string ConnectionInformation::getAddress() const -{ - return address; -} - -const std::string ConnectionInformation::getUser() const -{ - return user; -} - -const std::string ConnectionInformation::getPassword() const -{ - return password; -} - -bool ConnectionInformation::hasData() const -{ - return kind.size() == 0; -} - -bool ConnectionInformation::isEmpty() const -{ - return false; -} - -////////////////////////////// - - -//ImportSpecification::ImportSpecificationError::ImportSpecificationError(const std::string& msg_) -// :msg(msg_) -//{} - -//ImportSpecification::ImportSpecificationError::~ImportSpecificationError() throw() {} - - - -//const char* ImportSpecification::ImportSpecificationError::what() const throw() -//{ -// return msg.c_str(); -//} - - - -ImportSpecification::ImportSpecification() - : isEmptySpec(true) -{} - -ImportSpecification::ImportSpecification(bool isSubselect__) - : isEmptySpec(false), - isSubselect_(isSubselect__), - subselect_column_names(), - subselect_column_types(), - connection_name(""), - connection_information(), - parameters() -{ -} - - -void ImportSpecification::accept(ScriptDTOSerializer& serializer) const -{ - serializer.visit(*this); -} - -bool ImportSpecification::isEmpty() const -{ - return isEmptySpec; -} - - -bool ImportSpecification::isSubselect() const -{ - return isSubselect_; -} -bool ImportSpecification::hasSubselectColumnNames() const -{ - return subselect_column_names.size()>0; -} -bool ImportSpecification::hasSubselectColumnTypes() const -{ - return subselect_column_types.size()>0; -} -bool ImportSpecification::hasSubselectColumnSpecification() const -{ - return hasSubselectColumnNames() || hasSubselectColumnTypes(); -} -bool ImportSpecification::hasConnectionName() const -{ - return connection_name.size()>0; -} -bool ImportSpecification::hasConnectionInformation() const -{ - return connection_information.hasData() == false; -} -bool ImportSpecification::hasParameters() const -{ - return parameters.size()>0; -} -bool ImportSpecification::hasConsistentColumns() const -{ - return (isSubselect() && subselect_column_names.size() == subselect_column_types.size()) || (!isSubselect() && subselect_column_types.size() == 0); -} - -bool ImportSpecification::isCompleteImportSubselectSpecification() const -{ - return hasConsistentColumns() && hasSubselectColumnNames() && hasSubselectColumnTypes(); -} - -bool ImportSpecification::isCompleteImportIntoTargetTableSpecification() const -{ - return hasConsistentColumns(); -} - - - -const std::vector& ImportSpecification::getSubselectColumnNames() const -{ - if (!isSubselect()) - { - throw Error("import specification error: cannot get column names of non-subselect import specification"); - } - return subselect_column_names; -} - -const std::vector& ImportSpecification::getSubselectColumnTypes() const -{ - if (!isSubselect()) - { - throw Error("import specification error: cannot get column types of non-subselect import specification"); - } - - return subselect_column_types; -} - - -void ImportSpecification::appendSubselectColumnName(const std::string& columnName) -{ - if (!isSubselect()) - { - throw Error("import specification error: cannot add column name to non-subselect import specification"); - } - subselect_column_names.push_back(columnName); -} - -void ImportSpecification::appendSubselectColumnType(const std::string& columnType) -{ - if (!isSubselect()) - { - throw Error("import specification error: cannot add column type to non-subselect import specification"); - } - subselect_column_types.push_back(columnType); -} - -void ImportSpecification::setConnectionName(const std::string& connectionName_) -{ - if (hasConnectionName()) - { - throw Error("import specification error: connection name is set more than once"); - } - if (hasConnectionInformation()) - { - throw Error("import specification error: cannot set connection name, because there is already connection information set"); - } - connection_name = connectionName_; -} - -void ImportSpecification::setConnectionInformation(const ConnectionInformation& connectionInformation_) -{ - if (hasConnectionName()) - { - throw Error("import specification error: cannot set connection information, because there is already a connection name set"); - } - if (hasConnectionInformation()) - { - throw Error("import specification error: cannot set connection information more than once"); - } - connection_information = connectionInformation_; -} - - - -void ImportSpecification::addParameter(const std::string& key, const std::string& value) -{ - if (parameters.find(key) != parameters.end()) - { - std::stringstream sb; - sb << "import specification error: parameter with name '" << key << "', is set more than once"; - throw Error(sb.str()); - } - parameters[key] = value; -} - - -const std::string ImportSpecification::getConnectionName() const -{ - if (!hasConnectionName()) - { - throw Error("import specification error: cannot get connection name because it is not set"); - } - return connection_name; -} - -const ConnectionInformation ImportSpecification::getConnectionInformation() const -{ - if (!hasConnectionInformation()) - { - throw Error("import specification error: cannot get connection information because it is not set"); - } - return connection_information; -} - -const std::map& ImportSpecification::getParameters() const -{ - return parameters; -} - -} diff --git a/cpp_client/src/scriptDTO.h b/cpp_client/src/scriptDTO.h deleted file mode 100644 index 673a9f522..000000000 --- a/cpp_client/src/scriptDTO.h +++ /dev/null @@ -1,190 +0,0 @@ -#ifndef SCRIPT_DATA_TRANSFER_OBJECTS_H -#define SCRIPT_DATA_TRANSFER_OBJECTS_H - -#include -#include -#include -#include -#include -#include - -namespace ExecutionGraph -{ - -class DTOError : public std::exception -{ -public: - explicit DTOError(const std::string& msg_) - : msg(msg_) - {} - virtual ~DTOError() throw() {} - virtual const char* what() const throw() - { - return msg.c_str(); - } - -protected: - std::string msg; -}; - - -class ScriptDTOSerializer -{ -public: - virtual void visit(const class ScriptDTO&) {std::abort();} - virtual void visit(const class ImportSpecification&) = 0; - virtual void visit(const class ConnectionInformation&) = 0; - virtual void visit(const class StringDTO&) {std::abort();} // will be refactored. Only supported for Proto Serializer -}; - - -//! -//! -//! -//! - -class ScriptDTO -{ -public: - virtual void accept(ScriptDTOSerializer& serializer) const = 0; - virtual bool isEmpty() const = 0; -}; - -//! -//! -//! -//! - -class EmptyDTO : public ScriptDTO -{ -public: - EmptyDTO(); - - virtual void accept(ScriptDTOSerializer& serializer) const; - virtual bool isEmpty() const; -}; - - -//! -//! -//! -//! - -class ConnectionInformation : public ScriptDTO -{ -public: - class Error : public DTOError - { - public: - explicit Error(const std::string& msg) - : DTOError(msg) - {} - }; - - ConnectionInformation(); - - ConnectionInformation(const ConnectionInformation& other); - - ConnectionInformation(const std::string& kind, - const std::string& address, - const std::string& user, - const std::string& password); - - ConnectionInformation(const std::string& address, - const std::string& user, - const std::string& password); - - virtual void accept(ScriptDTOSerializer& serializer) const; - virtual bool isEmpty() const; - - const std::string getKind() const; - const std::string getAddress() const; - const std::string getUser() const; - const std::string getPassword() const; - - bool hasData() const; - -protected: - std::string kind; - std::string address; - std::string user; - std::string password; - -}; - - -class StringDTO : public ScriptDTO -{ -public: - StringDTO(); - StringDTO(const std::string& arg); - - virtual void accept(ScriptDTOSerializer& serializer) const; - virtual bool isEmpty() const; - - const std::string getArg() const; - -protected: - std::string arg; -}; - - -//! -//! -//! -//! -class ImportSpecification : public ScriptDTO -{ -private: - bool isEmptySpec; -public: - class Error : public DTOError - { - public: - explicit Error(const std::string& msg) - : DTOError(msg) - {} - }; - - virtual void accept(ScriptDTOSerializer& serializer) const; - virtual bool isEmpty() const; - - explicit ImportSpecification(); - explicit ImportSpecification(bool isSubselect__); - - void appendSubselectColumnName(const std::string& columnName); - void appendSubselectColumnType(const std::string& columnType); - void setConnectionName(const std::string& connectionName_); - void setConnectionInformation(const ConnectionInformation& connectionInformation_); - void addParameter(const std::string& key, const std::string& value); - - bool isSubselect() const; - bool hasSubselectColumnNames() const; - bool hasSubselectColumnTypes() const; - bool hasSubselectColumnSpecification() const; - bool hasConnectionName() const; - bool hasConnectionInformation() const; - bool hasParameters() const; - bool hasConsistentColumns() const; - bool isCompleteImportSubselectSpecification() const; - bool isCompleteImportIntoTargetTableSpecification() const; - - const std::vector& getSubselectColumnNames() const; - const std::vector& getSubselectColumnTypes() const; - const std::string getConnectionName() const; - const ConnectionInformation getConnectionInformation() const; - const std::map& getParameters() const; - -protected: - bool isSubselect_; - std::vector subselect_column_names; - std::vector subselect_column_types; - std::string connection_name; - ConnectionInformation connection_information; - std::map parameters; -}; - - -} - -#endif // SCRIPT_DATA_TRANSFER_OBJECTS_H diff --git a/cpp_client/src/scriptDTOWrapper.cc b/cpp_client/src/scriptDTOWrapper.cc deleted file mode 100644 index 1045443ae..000000000 --- a/cpp_client/src/scriptDTOWrapper.cc +++ /dev/null @@ -1,54 +0,0 @@ -#ifdef BUILDINSWIGDIR -#include "scriptDTOWrapper.h" -#else -#include -#endif - -#include - -namespace ExecutionGraph -{ - -ConnectionInformationWrapper::ConnectionInformationWrapper(const ConnectionInformation& connectionInformation_) - : connectionInformation(connectionInformation_) -{} - -char* ConnectionInformationWrapper::copyKind() const { - return strdup(connectionInformation.getKind().c_str()); -} -char* ConnectionInformationWrapper::copyAddress() const {return strdup(connectionInformation.getAddress().c_str());} -char* ConnectionInformationWrapper::copyUser() const {return strdup(connectionInformation.getUser().c_str());} -char* ConnectionInformationWrapper::copyPassword() const {return strdup(connectionInformation.getPassword().c_str());} - - -ImportSpecificationWrapper::ImportSpecificationWrapper(ImportSpecification* importSpecification_) - : importSpecification(importSpecification_) -{ - if (importSpecification != NULL) - { - // extract all the keys and values in some random but fixed order - for (std::map::const_iterator i = importSpecification->getParameters().begin(); - i != importSpecification->getParameters().end(); - ++i) - { - keys.push_back(i->first); - values.push_back(i->second); - } - } -} -bool ImportSpecificationWrapper::isSubselect() const {return importSpecification->isSubselect();} -size_t ImportSpecificationWrapper::numSubselectColumns() const {return importSpecification->getSubselectColumnNames().size();} -char* ImportSpecificationWrapper::copySubselectColumnName(size_t i) const {return ::strdup(importSpecification->getSubselectColumnNames().at(i).c_str());} -char* ImportSpecificationWrapper::copySubselectColumnType(size_t i) const {return ::strdup(importSpecification->getSubselectColumnTypes().at(i).c_str());} -bool ImportSpecificationWrapper::hasConnectionName() const {return importSpecification->hasConnectionName();} -bool ImportSpecificationWrapper::hasConnectionInformation() const {return importSpecification->hasConnectionInformation();} -char* ImportSpecificationWrapper::copyConnectionName() const {return ::strdup(importSpecification->getConnectionName().c_str());} -const ConnectionInformationWrapper ImportSpecificationWrapper::getConnectionInformation() const { - return ConnectionInformationWrapper(importSpecification->getConnectionInformation()); -} - -size_t ImportSpecificationWrapper::getNumberOfParameters() const {return keys.size();} -char* ImportSpecificationWrapper::copyKey(size_t pos) const {return ::strdup(keys.at(pos).c_str());} -char* ImportSpecificationWrapper::copyValue(size_t pos) const {return ::strdup(values.at(pos).c_str());} - -} diff --git a/cpp_client/src/scriptDTOWrapper.h b/cpp_client/src/scriptDTOWrapper.h deleted file mode 100644 index f5c90fe5b..000000000 --- a/cpp_client/src/scriptDTOWrapper.h +++ /dev/null @@ -1,52 +0,0 @@ -#ifndef SCRIPT_DATA_TRANSFER_OBJECT_C_WRAPPER_H -#define SCRIPT_DATA_TRANSFER_OBJECT_C_WRAPPER_H - -#ifdef BUILDINSWIGDIR -#include "scriptDTO.h" -#else -#include -#endif - -namespace ExecutionGraph -{ - -class ConnectionInformationWrapper -{ -public: - explicit ConnectionInformationWrapper(const ConnectionInformation& connectionInformation_); - char* copyKind() const; - char* copyAddress() const; - char* copyUser() const; - char* copyPassword() const; - -protected: - const ConnectionInformation connectionInformation; -}; - - -class ImportSpecificationWrapper -{ -public: - explicit ImportSpecificationWrapper(ExecutionGraph::ImportSpecification* importSpecification_); - bool isSubselect() const; - size_t numSubselectColumns() const; - char* copySubselectColumnName(size_t i) const; - char* copySubselectColumnType(size_t i) const; - bool hasConnectionName() const; - bool hasConnectionInformation() const; - char* copyConnectionName() const; - const ConnectionInformationWrapper getConnectionInformation() const; - size_t getNumberOfParameters() const; - char* copyKey(size_t pos) const; - char* copyValue(size_t pos) const; -protected: - ImportSpecification* importSpecification; - std::vector keys; - std::vector values; -}; - - - -} - -#endif // SCRIPT_DATA_TRANSFER_OBJECT_C_WRAPPER_H diff --git a/cpp_client/src/swigcontainers.h b/cpp_client/src/swigcontainers.h deleted file mode 100644 index a61972334..000000000 --- a/cpp_client/src/swigcontainers.h +++ /dev/null @@ -1,1506 +0,0 @@ -/* -*- mode: c++; -*- */ -$ifndef SWIGCONTAINERS_H -$define SWIGCONTAINERS_H - - - -#ifndef EXTERNAL_PROCESS -$include -$include -$include -$include -$include -$include "zmqcontainer.pb.h" -$include -#else -$include -$include -$include -$include -$include -$include "zmqcontainer.pb.h" -$include "scriptDTOWrapper.h" -#endif - -$include - -$include - -namespace SWIGVMContainers { - - - - -#ifdef EXTERNAL_PROCESS -void socket_send(zmq::socket_t &socket, zmq::message_t &zmsg); -bool socket_recv(zmq::socket_t &socket, zmq::message_t &zmsg, bool return_on_error = false); -#else -class ZMQConnectionHandler; -ZMQConnectionHandler *zmq_create_internal_handler(); -void zmq_delete_internal_handler(ZMQConnectionHandler *&handler); -ZMQConnectionHandler *zmq_create_external_handler(const std::string &redirector_url); -void zmq_delete_external_handler(ZMQConnectionHandler *&handler); -#endif - -$define SWIG_MAX_VAR_DATASIZE 4000000 - -class SWIGVM; -enum VMTYPE { - VM_UNSUPPORTED = 0, - VM_PYTHON = 1, - VM_SCHEME = 2, - VM_JAVASCRIPT = 3, - VM_R = 4, - VM_EXTERNAL = 5, - VM_JAVA = 6, - VM_PLUGIN_LANGUAGE = 7 -}; - -struct SWIGVMExceptionHandler -#ifndef EXTERNAL_PROCESS -: public ExecutionGraph::VMCExceptionHandler -#endif -{ - SWIGVMExceptionHandler() { exthrowed = false; } - ~SWIGVMExceptionHandler() { } - void setException(const char* msg) { - exmsg = msg; exthrowed = true; - } - std::string exmsg; - bool exthrowed; -}; - -enum SWIGVM_datatype_e { - UNSUPPORTED = 0, - DOUBLE = 1, - INT32 = 2, - INT64 = 3, - NUMERIC = 4, - TIMESTAMP = 5, - DATE = 6, - STRING = 7, - BOOLEAN = 8, - INTERVALYM = 9, - INTERVALDS = 10, - GEOMETRY = 11 -}; - -enum SWIGVM_itertype_e { - EXACTLY_ONCE = 1, - MULTIPLE = 2 -}; - - -struct SWIGVM_columntype_t { - SWIGVM_datatype_e type; - std::string type_name; - unsigned int len; - unsigned int prec; - unsigned int scale; - SWIGVM_columntype_t(const SWIGVM_datatype_e t, const char *n, const unsigned int l, const unsigned int p, const unsigned int s): - type(t), type_name(n), len(l), prec(p), scale(s) { } - SWIGVM_columntype_t(const SWIGVM_datatype_e t, const char *n, const unsigned int l): type(t), type_name(n), len(l), prec(0), scale(0) { } - SWIGVM_columntype_t(const SWIGVM_datatype_e t, const char *n): type(t), type_name(n), len(0), prec(0), scale(0) { } - SWIGVM_columntype_t(): type(UNSUPPORTED), type_name("UNSUPPORTED"), len(0), prec(0), scale(0) { } -}; - - -struct SWIGVM_params_t { -#ifndef EXTERNAL_PROCESS - ExecutionGraph::EXScriptVMContainerBasis *vm; - ExecutionGraph::VMCCachedTableIterator *inp_tbl; - ExecutionGraph::VMCCachedResultHandler *out_tbl; - StackAllocator *out_stack; - ZMQConnectionHandler *zmqhandler; - SWIGVM *vm_container; - Mutex mutex; - bool in_use; -#else - uint64_t connection_id; - zmq::socket_t *sock; -#endif - SWIGVMExceptionHandler *exch; - char *dbname; - char *dbversion; - char *script_name; - char *script_schema; - char *current_user; - char *current_schema; - char *script_code; - unsigned long long session_id; - unsigned long statement_id; - unsigned int node_count; - unsigned int node_id; - unsigned long long vm_id; - VMTYPE vm_type; - unsigned long long maximal_memory_limit; - - std::vector *inp_names; - std::vector *inp_types; - SWIGVM_itertype_e inp_iter_type; - bool inp_force_finish; - - std::vector *out_names; - std::vector *out_types; - SWIGVM_itertype_e out_iter_type; - - bool m_allocate_params; - std::vector *is_emitted; - bool singleCallMode; - - bool isInternalLanguage; - std::string pluginName; - std::string pluginURI; - std::string outputAddress; - - - SWIGVM_params_t(): -#ifndef EXTERNAL_PROCESS - vm(NULL), inp_tbl(NULL), out_tbl(NULL), out_stack(NULL), zmqhandler(NULL), - vm_container(NULL), mutex("SWIGVM") , in_use(false), -#else - connection_id(0), sock(NULL), -#endif - exch(NULL), dbname(NULL), dbversion(NULL), script_name(NULL), script_schema(NULL), current_user(NULL), current_schema(NULL), script_code(NULL), - session_id(0), statement_id(0), node_count(0), node_id(0), vm_id(0), - vm_type(VM_UNSUPPORTED), maximal_memory_limit(0), - inp_names(NULL), inp_types(NULL), inp_iter_type(MULTIPLE), inp_force_finish(false), - out_names(NULL), out_types(NULL), out_iter_type(MULTIPLE), - m_allocate_params(false), - is_emitted(NULL), singleCallMode(false), isInternalLanguage(true), pluginName(""), pluginURI(""), outputAddress("") - { } - - SWIGVM_params_t(const bool allocate_params): -#ifndef EXTERNAL_PROCESS - vm(NULL), inp_tbl(NULL), out_tbl(NULL), out_stack(NULL), zmqhandler(NULL), - vm_container(NULL), mutex("SWIGVM") , in_use(false), -#else - connection_id(0), sock(NULL), -#endif - exch(NULL), dbname(NULL), dbversion(NULL), script_name(NULL), script_schema(NULL), current_user(NULL), current_schema(NULL), script_code(NULL), - session_id(0), statement_id(0), node_count(0), node_id(0), vm_id(0), - vm_type(VM_UNSUPPORTED), maximal_memory_limit(0), - inp_names(allocate_params ? new std::vector() : NULL), - inp_types(allocate_params ? new std::vector() : NULL), - inp_iter_type(MULTIPLE), - inp_force_finish(false), - out_names(allocate_params ? new std::vector() : NULL), - out_types(allocate_params ? new std::vector() : NULL), - out_iter_type(MULTIPLE), - m_allocate_params(allocate_params), - is_emitted(allocate_params ? new std::vector() : NULL), - singleCallMode(false), isInternalLanguage(true), pluginName(""), pluginURI(""), outputAddress("") - { } - - SWIGVM_params_t(const SWIGVM_params_t &p): -#ifndef EXTERNAL_PROCESS - vm(p.vm), - inp_tbl(p.inp_tbl), - out_tbl(p.out_tbl), - mutex("SWIGVM"), -#else - connection_id(p.connection_id), - sock(p.sock), -#endif - exch(p.exch), - dbname(p.dbname), - dbversion(p.dbversion), - script_name(p.script_name), - script_schema(p.script_schema), - current_user(p.current_user), - current_schema(p.current_schema), - script_code(p.script_code), - session_id(p.session_id), - statement_id(p.statement_id), - node_count(p.node_count), - node_id(p.node_id), - vm_id(p.vm_id), - vm_type(p.vm_type), - maximal_memory_limit(p.maximal_memory_limit), - inp_names(p.inp_names), - inp_types(p.inp_types), - inp_iter_type(p.inp_iter_type), - inp_force_finish(p.inp_force_finish), - out_names(p.out_names), - out_types(p.out_types), - out_iter_type(p.out_iter_type), - m_allocate_params(false), - is_emitted(p.is_emitted), - isInternalLanguage(p.isInternalLanguage), - pluginName(p.pluginName), - pluginURI(p.pluginURI), - outputAddress(p.outputAddress) - { - if (p.m_allocate_params) abort(); - } - ~SWIGVM_params_t() { - if (m_allocate_params) { - if (inp_names != NULL) { delete inp_names; inp_names = NULL; } - if (inp_types != NULL) { delete inp_types; inp_types = NULL; } - if (out_names != NULL) { delete out_names; out_names = NULL; } - if (out_types != NULL) { delete out_types; out_types = NULL; } - if (is_emitted != NULL) { delete is_emitted; is_emitted = NULL; } - } - } -}; -#ifdef EXTERNAL_PROCESS -extern __thread SWIGVM_params_t *SWIGVM_params; -#endif - -class SWIGMetadata { - public: - SWIGMetadata( -#ifndef EXTERNAL_PROCESS - SWIGVM_params_t *SWIGVM_params -#endif - ): -#ifndef EXTERNAL_PROCESS - m_vm(SWIGVM_params->vm), -#else - m_connection_id(SWIGVM_params->connection_id), - m_socket(*(SWIGVM_params->sock)), -#endif - m_exch(SWIGVM_params->exch), - m_db_name(SWIGVM_params->dbname), - m_db_version(SWIGVM_params->dbversion), - m_script_name(SWIGVM_params->script_name), - m_script_schema(SWIGVM_params->script_schema), - m_current_user(SWIGVM_params->current_user), - m_current_schema(SWIGVM_params->current_schema), - m_script_code(SWIGVM_params->script_code), - m_session_id(SWIGVM_params->session_id), - m_statement_id(SWIGVM_params->statement_id), - m_node_count(SWIGVM_params->node_count), - m_node_id(SWIGVM_params->node_id), - m_vm_id(SWIGVM_params->vm_id), - m_input_names(*(SWIGVM_params->inp_names)), - m_input_types(*(SWIGVM_params->inp_types)), - m_input_iter_type(SWIGVM_params->inp_iter_type), - m_output_names(*(SWIGVM_params->out_names)), - m_output_types(*(SWIGVM_params->out_types)), - m_output_iter_type(SWIGVM_params->out_iter_type), - m_memory_limit(SWIGVM_params->maximal_memory_limit), - m_vm_type(SWIGVM_params->vm_type), - m_is_emitted(*(SWIGVM_params->is_emitted)), - m_isInternalLanguage(SWIGVM_params->isInternalLanguage), - m_pluginLanguageName(SWIGVM_params->pluginName), - m_pluginURI(SWIGVM_params->pluginURI), - m_outputAddress(SWIGVM_params->outputAddress) - { - { std::stringstream sb; sb << m_session_id; m_session_id_s = sb.str(); } - { std::stringstream sb; sb << m_vm_id; m_vm_id_s = sb.str(); } - } - virtual ~SWIGMetadata() { } - - inline const char* databaseName() { return m_db_name.c_str(); } - inline const char* databaseVersion() { return m_db_version.c_str(); } - inline const char* scriptName() { return m_script_name.c_str(); } - inline const char* scriptSchema() { return m_script_schema.c_str(); } - inline const char* currentUser() { return m_current_user.c_str(); } - inline const char* currentSchema() {return m_current_schema.c_str();} - inline const char* scriptCode() { return m_script_code.c_str(); } - inline const unsigned long long sessionID() { return m_session_id; } - inline const char *sessionID_S() { return m_session_id_s.c_str(); } - inline const unsigned long statementID() { return m_statement_id; } - inline const unsigned int nodeCount() { return m_node_count; } - inline const unsigned int nodeID() { return m_node_id; } - inline const unsigned long long vmID() { return m_vm_id; } - inline const unsigned long long memoryLimit() { return m_memory_limit; } - inline const VMTYPE vmType() { return m_vm_type; } - inline const char *vmID_S() { return m_vm_id_s.c_str(); } - inline const ExecutionGraph::ConnectionInformationWrapper connectionInformation(const char* connection_name) - { -#ifndef EXTERNAL_PROCESS - try { - ExecutionGraph::ConnectionInformation ci = m_vm->getConnectionInfo(connection_name); - return ExecutionGraph::ConnectionInformationWrapper(ci); - } catch (SyntaxErrorOrAccessRuleViolation &err) { - m_exch->setException(err.what()); - } - - return ExecutionGraph::ConnectionInformationWrapper(ExecutionGraph::ConnectionInformation()); -#else - exascript_request request; - request.set_type(MT_IMPORT); - request.set_connection_id(m_connection_id); - exascript_import_req *req = request.mutable_import(); - req->set_script_name(connection_name); - req->set_kind(PB_IMPORT_CONNECTION_INFORMATION); - if (!request.SerializeToString(&m_output_buffer)) { - m_exch->setException("Communication error: failed to serialize data"); - return ExecutionGraph::ConnectionInformationWrapper(ExecutionGraph::ConnectionInformation()); - } - zmq::message_t zmsg_req((void*)m_output_buffer.c_str(), m_output_buffer.length(), NULL, NULL); - socket_send(m_socket, zmsg_req); - zmq::message_t zmsg_rep; - socket_recv(m_socket, zmsg_rep); - exascript_response response; - if (!response.ParseFromArray(zmsg_rep.data(), zmsg_rep.size())) { - m_exch->setException("Communication error: failed to parse data"); - return ExecutionGraph::ConnectionInformationWrapper(ExecutionGraph::ConnectionInformation()); - } - if (response.type() != MT_IMPORT) { - m_exch->setException("Internal error: wrong message type"); - return ExecutionGraph::ConnectionInformationWrapper(ExecutionGraph::ConnectionInformation()); - } - const exascript_import_rep &rep = response.import(); - if (rep.has_exception_message()) { - m_exch->setException(rep.exception_message().c_str()); - return ExecutionGraph::ConnectionInformationWrapper(ExecutionGraph::ConnectionInformation()); - } - if (!rep.has_connection_information()) { - m_exch->setException("Internal error: No connection information returned"); - return ExecutionGraph::ConnectionInformationWrapper(ExecutionGraph::ConnectionInformation()); - } - connection_information_rep ci = rep.connection_information(); - return ExecutionGraph::ConnectionInformationWrapper( - ExecutionGraph::ConnectionInformation(ci.kind(), ci.address(), ci.user(), ci.password())); -#endif - } - - inline const char* moduleContent(const char* name) { -#ifndef EXTERNAL_PROCESS - try { - RefSchemaScript s = m_vm->getIncludeScript(name); - m_temp_code = s->getText().c_str(); - return m_temp_code.c_str(); - } catch (SyntaxErrorOrAccessRuleViolation &err) { - m_exch->setException(err.what()); - } - return NULL; -#else - exascript_request request; - request.set_type(MT_IMPORT); - request.set_connection_id(m_connection_id); - exascript_import_req *req = request.mutable_import(); - req->set_script_name(name); - req->set_kind(PB_IMPORT_SCRIPT_CODE); - if (!request.SerializeToString(&m_output_buffer)) { - m_exch->setException("Communication error: failed to serialize data"); - return NULL; - } - zmq::message_t zmsg_req((void*)m_output_buffer.c_str(), m_output_buffer.length(), NULL, NULL); - socket_send(m_socket, zmsg_req); - zmq::message_t zmsg_rep; - socket_recv(m_socket, zmsg_rep); - exascript_response response; - if (!response.ParseFromArray(zmsg_rep.data(), zmsg_rep.size())) { - m_exch->setException("Communication error: failed to parse data"); - return NULL; - } - if (response.type() != MT_IMPORT) { - m_exch->setException("Internal error: wrong message type"); - return NULL; - } - const exascript_import_rep &rep = response.import(); - if (rep.has_exception_message()) { - m_exch->setException(rep.exception_message().c_str()); - return NULL; - } - if (!rep.has_source_code()) { - m_exch->setException("Internal error: No source code returned"); - return NULL; - } - m_temp_code = rep.source_code(); - return m_temp_code.c_str(); -#endif - } - - inline const unsigned int inputColumnCount() { return m_input_names.size(); } - inline const char *inputColumnName(unsigned int col) - { return col >= m_input_names.size() ? NULL : m_input_names[col].c_str(); } - inline const SWIGVM_datatype_e inputColumnType(unsigned int col) - { return col >= m_input_types.size() ? UNSUPPORTED : m_input_types[col].type; } - inline const char *inputColumnTypeName(unsigned int col) - { return col >= m_input_types.size() ? NULL : m_input_types[col].type_name.c_str(); } - inline const unsigned int inputColumnSize(unsigned int col) - { return col >= m_input_types.size() ? 0 : m_input_types[col].len; } - inline const unsigned int inputColumnPrecision(unsigned int col) - { return col >= m_input_types.size() ? 0 : m_input_types[col].prec; } - inline const unsigned int inputColumnScale(unsigned int col) - { return col >= m_input_types.size() ? 0 : m_input_types[col].scale; } - inline const SWIGVM_itertype_e inputType() { return m_input_iter_type; } - - inline const unsigned int outputColumnCount() { return m_output_names.size(); } - inline const char *outputColumnName(unsigned int col) { - if (m_output_iter_type == EXACTLY_ONCE && col == 0) - return "RETURN"; - return col >= m_output_names.size() ? NULL : m_output_names[col].c_str(); - } - inline const SWIGVM_datatype_e outputColumnType(unsigned int col) - { return col >= m_output_types.size() ? UNSUPPORTED : m_output_types[col].type; } - inline const char *outputColumnTypeName(unsigned int col) - { return col >= m_output_types.size() ? NULL : m_output_types[col].type_name.c_str(); } - inline const unsigned int outputColumnSize(unsigned int col) - { return col >= m_output_types.size() ? 0 : m_output_types[col].len; } - inline const unsigned int outputColumnPrecision(unsigned int col) - { return col >= m_output_types.size() ? 0 : m_output_types[col].prec; } - inline const unsigned int outputColumnScale(unsigned int col) - { return col >= m_output_types.size() ? 0 : m_output_types[col].scale; } - inline const SWIGVM_itertype_e outputType() { return m_output_iter_type; } - - inline const bool isEmittedColumn(unsigned int col){ - if (col >= m_is_emitted.size()) - { - abort(); - } - return m_is_emitted[col]; - } - - inline const char* checkException() { - if (m_exch->exthrowed) { - m_exch->exthrowed = false; - return m_exch->exmsg.c_str(); - } else return NULL; - } - - - inline const bool isInternalLanguage() - { - return m_isInternalLanguage; - } - - inline const char* pluginLanguageName() - { - return m_pluginLanguageName.c_str(); - } - - inline const char* pluginURI() - { - return m_pluginURI.c_str(); - } - - inline const char* outputAddress() - { - return m_outputAddress.c_str(); - } - - private: -#ifndef EXTERNAL_PROCESS - ExecutionGraph::EXScriptVMContainerBasis *m_vm; -#else - const uint64_t m_connection_id; - zmq::socket_t &m_socket; - std::string m_output_buffer; -#endif - SWIGVMExceptionHandler *m_exch; - const std::string m_db_name; - const std::string m_db_version; - const std::string m_script_name; - const std::string m_script_schema; - const std::string m_current_user; - const std::string m_current_schema; - const std::string m_script_code; - const unsigned long m_session_id; - const unsigned long m_statement_id; - const unsigned int m_node_count; - const unsigned int m_node_id; - const unsigned long m_vm_id; - std::string m_temp_code; - - const std::vector &m_input_names; - const std::vector &m_input_types; - const SWIGVM_itertype_e m_input_iter_type; - - const std::vector &m_output_names; - const std::vector &m_output_types; - const SWIGVM_itertype_e m_output_iter_type; - - const unsigned long long m_memory_limit; - const std::string m_meta_info; - const VMTYPE m_vm_type; - std::string m_session_id_s; - std::string m_vm_id_s; - const std::vector &m_is_emitted; - - const bool m_isInternalLanguage; - std::string m_pluginLanguageName; - std::string m_pluginURI; - std::string m_outputAddress; -}; - -class SWIGGeneralIterator { - protected: - SWIGVMExceptionHandler *m_exch; - - public: - SWIGGeneralIterator(SWIGVMExceptionHandler *exch): m_exch(exch) { } - virtual ~SWIGGeneralIterator() { } - inline const char* checkException() { - if (m_exch->exthrowed) { - m_exch->exthrowed = false; - return m_exch->exmsg.c_str(); - } else return NULL; - } -}; - -class SWIGTableIterator: public SWIGGeneralIterator { - private: -#ifndef EXTERNAL_PROCESS - SWIGVM_params_t *SWIGVM_params; - ExecutionGraph::VMCCachedTableIterator &m_table; - size_t m_string_buffer_length; - char *m_string_buffer_data; -#else - const uint64_t m_connection_id; - zmq::socket_t &m_socket; - std::string m_output_buffer; - exascript_request m_request; - exascript_response m_next_response; - uint64_t m_rows_received; - struct values_per_row_t { - uint64_t strings, bools, int32s, int64s, doubles; - values_per_row_t(): strings(0), bools(0), int32s(0), int64s(0), doubles(0) {} - void reset() { strings = bools = int32s = int64s = doubles = 0; } - } m_values_per_row; - uint64_t m_column_count; - std::vector m_col_offsets; - uint64_t m_rows_in_group; - uint64_t m_current_row; -#endif - uint64_t m_rows_completed; - uint64_t m_rows_group_completed; - bool m_was_null; - const std::vector &m_types; - -#ifdef EXTERNAL_PROCESS - void increment_col_offsets(bool reset = false) { - m_current_row = m_next_response.next().table().row_number(m_rows_completed); //local row number - uint64_t current_column = 0; - ssize_t null_index = 0; - if (reset) m_values_per_row.reset(); - null_index = m_rows_completed * m_column_count; - if (m_next_response.next().table().data_nulls_size() <= (null_index + (ssize_t)m_column_count - 1)) { - std::stringstream sb; - sb << "Internal error: not enough nulls in packet: wanted index " << (null_index + m_column_count - 1) - << " but have " << m_next_response.next().table().data_nulls_size() - << " elements"; - m_exch->setException(sb.str().c_str()); - return; - } - for (std::vector::const_iterator - it = m_types.begin(); it != m_types.end(); ++it, ++current_column, ++null_index) - { - if (m_next_response.next().table().data_nulls(null_index)) - continue; - switch (it->type) { - case UNSUPPORTED: m_exch->setException("Unsupported data type found"); return; - case DOUBLE: m_col_offsets[current_column] = m_values_per_row.doubles++; break; - case INT32: m_col_offsets[current_column] = m_values_per_row.int32s++; break; - case INT64: m_col_offsets[current_column] = m_values_per_row.int64s++; break; - case NUMERIC: - case TIMESTAMP: - case DATE: - case STRING: m_col_offsets[current_column] = m_values_per_row.strings++; break; - case BOOLEAN: m_col_offsets[current_column] = m_values_per_row.bools++; break; - default: m_exch->setException("Unknown data type found"); return; - } - } - } - - void receive_next_data(bool reset) { - m_rows_received = 0; - m_rows_completed = 0; - { // send request - m_request.Clear(); - m_request.set_connection_id(m_connection_id); - if (reset) m_request.set_type(MT_RESET); - else m_request.set_type(MT_NEXT); - if(!m_request.SerializeToString(&m_output_buffer)) { - m_exch->setException("Communication error: failed to serialize data"); - return; - } - zmq::message_t zmsg((void*)m_output_buffer.c_str(), m_output_buffer.length(), NULL, NULL); - socket_send(m_socket, zmsg); - } { // receive response - zmq::message_t zmsg; - socket_recv(m_socket, zmsg); - m_next_response.Clear(); - if (!m_next_response.ParseFromArray(zmsg.data(), zmsg.size())) { - m_exch->setException("Communication error: failed to parse data"); - return; - } - if (m_next_response.connection_id() != m_connection_id) { - m_exch->setException("Communication error: wrong connection id"); - return; - } - if (m_next_response.type() == MT_DONE) { - return; - } - if (m_next_response.type() == MT_CLOSE) { - const exascript_close &rep = m_next_response.close(); - if (!rep.has_exception_message()) { - if (m_rows_completed == 0) { - return; - } else m_exch->setException("Unknown error occured"); - } else { - m_exch->setException(rep.exception_message().c_str()); - } - return; - } - if ((reset && (m_next_response.type() != MT_RESET)) || - (!reset && (m_next_response.type() != MT_NEXT))) - { - m_exch->setException("Communication error: wrong message type"); - return; - } - m_rows_received = m_next_response.next().table().rows(); - m_rows_in_group = m_next_response.next().table().rows_in_group(); - } - increment_col_offsets(true); - } - - inline ssize_t check_index(ssize_t index, ssize_t available, const char *tp, const char *otype, const char *ts) { - if (available > index) return index; - std::stringstream sb; - sb << "Internal error: not enough " << tp << otype << ts << " in packet: wanted index " - << index << " but have " << available << " elements (on " - << m_rows_received << '/' << m_rows_completed << " of received/completed rows"; - m_exch->setException(sb.str().c_str()); - m_was_null = true; - return -1; - } - - inline ssize_t check_value(unsigned int col, ssize_t available, const char *otype) { - m_was_null = false; - ssize_t index = check_index(m_column_count * m_rows_completed + col, - m_next_response.next().table().data_nulls_size(), - "nulls for ", otype, ""); - if (m_was_null) return -1; - m_was_null = m_next_response.next().table().data_nulls(index); - if (m_was_null) return -1; - index = check_index(m_col_offsets[col], available, "", otype, "s"); - if (m_was_null) return -1; - return index; - } -#else - inline char *string_buffer(size_t length) { - if (length >= m_string_buffer_length) { - m_string_buffer_length = length; - delete[] m_string_buffer_data; - m_string_buffer_data = new char[m_string_buffer_length + 1]; - } - return m_string_buffer_data; - } - - inline char *string_buffer(const char* str, size_t length) { - char *buf = string_buffer(length); - memcpy(buf, str, length); - buf[length] = '\0'; - return buf; - } - - inline char *string_buffer(const MString &str) { - size_t strlength = str.length(); - char *buf = string_buffer(strlength); - memcpy(buf, str.c_str(), strlength); - buf[strlength] = '\0'; - return buf; - } - - inline char *string_buffer(const std::string &str) { - size_t strlength = str.length(); - char *buf = string_buffer(strlength); - memcpy(buf, str.c_str(), strlength); - buf[strlength] = '\0'; - return buf; - } -#endif - - public: - SWIGTableIterator( -#ifndef EXTERNAL_PROCESS - SWIGVM_params_t *params -#endif - ): -#ifndef EXTERNAL_PROCESS - SWIGGeneralIterator(params->exch), - SWIGVM_params(params), - m_table(*(SWIGVM_params->inp_tbl)), - m_string_buffer_length(1024), - m_string_buffer_data(new char[m_string_buffer_length + 1]), -#else - SWIGGeneralIterator(SWIGVM_params->exch), - m_connection_id(SWIGVM_params->connection_id), - m_socket(*(SWIGVM_params->sock)), - m_column_count(SWIGVM_params->inp_types->size()), - m_col_offsets(SWIGVM_params->inp_types->size()), - m_current_row((uint64_t)-1), -#endif - m_rows_completed(0), - m_rows_group_completed(1), - m_was_null(false), - m_types(*(SWIGVM_params->inp_types)) - { -#ifdef EXTERNAL_PROCESS - receive_next_data(false); -#endif - } - - ~SWIGTableIterator() { -#ifndef EXTERNAL_PROCESS - delete[] m_string_buffer_data; -#else - //std::cerr << "#### SWIGVM " << m_connection_id << " TableIterator destructor" << std::endl; -#endif - } - -#ifdef EXTERNAL_PROCESS - //returns the local row number of the row last read (used by emit to set local row number for emitted rows) - uint64_t get_current_row() - { - return m_current_row; - } -#endif - - inline void reinitialize() { -#ifdef EXTERNAL_PROCESS - m_rows_completed = 0; - m_rows_group_completed = 1; - m_values_per_row.reset(); - receive_next_data(false); -#endif - } - - inline bool next() { -#ifndef EXTERNAL_PROCESS - if (m_table.next()) { - ++m_rows_completed; - ++m_rows_group_completed; - return true; - } - return false; -#else - if (m_rows_received == 0) { - m_exch->setException("Iteration finished"); - return false; - } - ++m_rows_completed; - ++m_rows_group_completed; - if (SWIGVM_params->inp_force_finish) - return false; - if (m_rows_completed >= m_rows_received) { - receive_next_data(false); - if (m_rows_received == 0) - return false; - else return true; - } - increment_col_offsets(); - return true; -#endif - } - - inline bool eot() { -#ifndef EXTERNAL_PROCESS - return false; // TODO: dont workind now -#else - return m_rows_received == 0; -#endif - } - - inline void reset() { - m_rows_group_completed = 1; -#ifndef EXTERNAL_PROCESS - m_table.reset(); - m_rows_completed = 0; -#else - SWIGVM_params->inp_force_finish = false; - receive_next_data(true); -#endif - } - - inline unsigned long restBufferSize() { -#ifdef EXTERNAL_PROCESS - if (m_rows_completed < m_rows_received) - return m_rows_received - m_rows_completed; -#endif - return 0; - } - - inline unsigned long rowsCompleted() { - return m_rows_group_completed; - } - - inline unsigned long rowsInGroup() { -#ifdef EXTERNAL_PROCESS - return m_rows_in_group; -#else - return m_table.size(); -#endif - } - - inline double getDouble(unsigned int col) { - if (col >= m_types.size()) { m_exch->setException("Column does not exist"); m_was_null = true; return 0.0; } - if (m_types[col].type != DOUBLE) { - m_exch->setException("Wrong column type"); - m_was_null = true; - return 0.0; - } -#ifndef EXTERNAL_PROCESS - double ret = 0.0; - m_was_null = !m_table.get(col, ret); - if (m_was_null) return 0.0; - return ret; -#else - ssize_t index = check_value(col, m_next_response.next().table().data_double_size(), "double"); - if (m_was_null) { - return 0.0; - } - return m_next_response.next().table().data_double(index); -#endif - } - - inline const char *getString(unsigned int col, size_t *length = NULL) { - if (col >= m_types.size()) { m_exch->setException("Column does not exist"); m_was_null = true; return ""; } - if (m_types[col].type != STRING) { - m_exch->setException("Wrong column type"); - m_was_null = true; - return ""; - } -#ifndef EXTERNAL_PROCESS - StringData ret; - ret.string = (char*)""; - ret.length = 0; - m_was_null = !m_table.get(col, ret); - if (m_was_null) return ""; - if (length != NULL) { - *length = ret.length; - return ret.string; - } - return string_buffer(ret.string, ret.length); -#else - ssize_t index = check_value(col, m_next_response.next().table().data_string_size(), "string"); - if (m_was_null) return ""; - const std::string &s(m_next_response.next().table().data_string(index)); - if (length != NULL) *length = s.length(); - return s.c_str(); -#endif - } - - inline int32_t getInt32(unsigned int col) { - if (col >= m_types.size()) { m_exch->setException("Column does not exist"); m_was_null = true; return 0; } - if (m_types[col].type != INT32) { - m_exch->setException("Wrong column type"); - m_was_null = true; - return 0; - } -#ifndef EXTERNAL_PROCESS - int32_t ret = 0; - m_was_null = !m_table.get(col, ret); - if (m_was_null) return 0; - return ret; -#else - ssize_t index = check_value(col, m_next_response.next().table().data_int32_size(), "int32"); - if (m_was_null) return 0; - return m_next_response.next().table().data_int32(index); -#endif - } - - inline int64_t getInt64(unsigned int col) { - if (col >= m_types.size()) { m_exch->setException("Column does not exist"); m_was_null = true; return 0LL; } - if (m_types[col].type != INT64) { - m_exch->setException("Wrong column type"); - m_was_null = true; - return 0LL; - } -#ifndef EXTERNAL_PROCESS - int64_t ret = 0; - m_was_null = !m_table.get(col, ret); - if (m_was_null) return 0; - return ret; -#else - ssize_t index = check_value(col, m_next_response.next().table().data_int64_size(), "int64"); - if (m_was_null) return 0LL; - return m_next_response.next().table().data_int64(index); -#endif - } - - inline const char *getNumeric(unsigned int col) { - if (col >= m_types.size()) { m_exch->setException("Column does not exist"); m_was_null = true; return ""; } - if (m_types[col].type != NUMERIC) { m_exch->setException("Wrong column type"); m_was_null = true; return ""; } -#ifndef EXTERNAL_PROCESS - if (m_types[col].len == 4) { - smallint ret = 0; - m_was_null = !m_table.get(col, ret); - if (m_was_null) return "0"; - m_string_buffer_data[DTM::numeric_to_string(m_string_buffer_data, m_string_buffer_length, ret, m_types[col].scale, ".,")] = '\0'; - return m_string_buffer_data; - } - if (m_types[col].len == 8) { - integer ret = 0; - m_was_null = !m_table.get(col, ret); - if (m_was_null) return "0"; - m_string_buffer_data[DTM::numeric_to_string(m_string_buffer_data, m_string_buffer_length, ret, m_types[col].scale, ".,")] = '\0'; - return m_string_buffer_data; - } - if (m_types[col].len == 16) { - int128_t ret = 0; - m_was_null = !m_table.get(col, ret); - if (m_was_null) return "0"; - m_string_buffer_data[DTM::numeric_to_string(m_string_buffer_data, m_string_buffer_length, ret, m_types[col].scale, ".,")] = '\0'; - return m_string_buffer_data; - } - return "0"; -#else - ssize_t index = check_value(col, m_next_response.next().table().data_string_size(), "string"); - if (m_was_null) return "0"; - return m_next_response.next().table().data_string(index).c_str(); -#endif - } - - inline const char *getTimestamp(unsigned int col) { - if (col >= m_types.size()) { m_exch->setException("Column does not exist"); m_was_null = true; return ""; } - if (m_types[col].type != TIMESTAMP) { m_exch->setException("Wrong column type"); m_was_null = true; return ""; } -#ifndef EXTERNAL_PROCESS - timestamp_t data = 0; - m_was_null = !m_table.get(col, data); - if (m_was_null) return "1970-01-01 00:00:00.00 0000"; - return string_buffer(DTM::timestamp_to_string(7, data, "YYYY-MM-DD HH24:MI:SS.FF6")); -#else - ssize_t index = check_value(col, m_next_response.next().table().data_string_size(), "string"); - if (m_was_null) return "1970-01-01 00:00:00.00 0000"; - return m_next_response.next().table().data_string(index).c_str(); -#endif - } - - inline const char *getDate(unsigned int col) { - if (col >= m_types.size()) { m_exch->setException("Column does not exist"); m_was_null = true; return ""; } - if (m_types[col].type != DATE) { m_exch->setException("Wrong column type"); m_was_null = true; return ""; } -#ifndef EXTERNAL_PROCESS - date data = min_date; - m_was_null = !m_table.get(col, data); - if (m_was_null) return "1970-01-01"; - return string_buffer(DTM::date_to_string(7, data, "YYYY-MM-DD")); -#else - ssize_t index = check_value(col, m_next_response.next().table().data_string_size(), "string"); - if (m_was_null) return "1970-01-01"; - return m_next_response.next().table().data_string(index).c_str(); -#endif - } - - inline bool getBoolean(unsigned int col) { - if (col >= m_types.size()) { m_exch->setException("Column does not exist"); m_was_null = true; return ""; } - if (m_types[col].type != BOOLEAN) { m_exch->setException("Wrong column type"); m_was_null = true; return ""; } -#ifndef EXTERNAL_PROCESS - BooleanData ret = V_FALSE; - m_was_null = !m_table.get(col, ret); - if (m_was_null) return false; - return ret == V_TRUE; -#else - ssize_t index = check_value(col, m_next_response.next().table().data_bool_size(), "bool"); - if (m_was_null) return false; - return m_next_response.next().table().data_bool(index); -#endif - } - - inline bool wasNull() { return m_was_null; } -}; - -class SWIGResultHandler: public SWIGGeneralIterator { - private: -#ifdef EXTERNAL_PROCESS - SWIGTableIterator* m_table_iterator; - const uint64_t m_connection_id; - zmq::socket_t &m_socket; - std::string m_output_buffer; - uint64_t m_message_size; - - struct rowdata_t { - std::map null_data; - std::map bool_data; - std::map double_data; - std::map int32_data; - std::map int64_data; - std::map string_data; - }; - - rowdata_t m_rowdata; - exascript_request m_emit_request; -#else - SWIGVM_params_t *SWIGVM_params; - StackAllocator &m_stack; - uint64_t m_stack_size_offset; - ExecutionGraph::VMCCachedResultHandler &m_table; - ExecutionGraph::VMCCachedTableIterator &m_input_table; -#endif - uint64_t m_rows_emited; - const std::vector &m_types; - - public: - - SWIGResultHandler( -#ifndef EXTERNAL_PROCESS - SWIGVM_params_t *params -#else - SWIGTableIterator* table_iterator -#endif - ): -#ifndef EXTERNAL_PROCESS - SWIGGeneralIterator(params->exch), - SWIGVM_params(params), - m_stack(*(SWIGVM_params->out_stack)), - m_stack_size_offset(m_stack.getAllocatedMemory()), - m_table(*(SWIGVM_params->out_tbl)), - m_input_table(*(SWIGVM_params->inp_tbl)), -#else - SWIGGeneralIterator(SWIGVM_params->exch), - m_table_iterator(table_iterator), - m_connection_id(SWIGVM_params->connection_id), - m_socket(*(SWIGVM_params->sock)), - m_message_size(0), -#endif - m_rows_emited(1), - m_types(*(SWIGVM_params->out_types)) - { } - - ~SWIGResultHandler() { -#ifdef EXTERNAL_PROCESS - //std::cerr << "#### SWIGVM " << m_connection_id << " ResultHandler destructor" << std::endl; -#endif - } - - inline void reinitialize() { - m_rows_emited = 0; -#ifdef EXTERNAL_PROCESS - m_message_size = 0; - m_emit_request.Clear(); - m_rowdata = rowdata_t(); -#endif - } - - inline unsigned long rowsEmited() { - return m_rows_emited; - } - - inline void flush() { -#ifndef EXTERNAL_PROCESS - m_table.flush(); -#else - exascript_emit_data_req *req = m_emit_request.mutable_emit(); - exascript_table_data *table = req->mutable_table(); - if (table->has_rows() && table->rows() > 0) { - { m_emit_request.set_type(MT_EMIT); - m_emit_request.set_connection_id(m_connection_id); - if (!m_emit_request.SerializeToString(&m_output_buffer)) { - m_exch->setException("Communication error: failed to serialize data"); - return; - } - zmq::message_t zmsg((void*)m_output_buffer.c_str(), m_output_buffer.length(), NULL, NULL); - socket_send(m_socket, zmsg); - m_emit_request.Clear(); - m_message_size = 0; - } - { zmq::message_t zmsg; - socket_recv(m_socket, zmsg); - exascript_response response; - if (!response.ParseFromArray(zmsg.data(), zmsg.size())) { - m_exch->setException("Communication error: failed to parse data"); - return; - } - if (response.connection_id() != m_connection_id) { - std::stringstream sb; - sb << "Received wrong connection id " << response.connection_id() - << ", should be " << m_connection_id; - m_exch->setException(sb.str().c_str()); - return; - } - if (response.type() == MT_CLOSE) { - if (!response.close().has_exception_message()) - m_exch->setException("Unknown error occured"); - else m_exch->setException(response.close().exception_message().c_str()); - return; - } - if (response.type() != MT_EMIT) { - m_exch->setException("Wrong response type"); - return; - } - } - } -#endif - } - - inline bool next() { - ++m_rows_emited; -#ifndef EXTERNAL_PROCESS - return m_table.next(); -#else - exascript_emit_data_req *req = m_emit_request.mutable_emit(); - exascript_table_data *table = req->mutable_table(); - for (unsigned int col = 0; col < m_types.size(); ++col) { - bool null_data = m_rowdata.null_data[col]; - table->add_data_nulls(null_data); - if (null_data) continue; - switch (m_types[col].type) { - case UNSUPPORTED: - m_exch->setException("Unsupported data type found"); - return false; - case DOUBLE: - if (m_rowdata.double_data.find(col) == m_rowdata.double_data.end()) { - m_exch->setException("Not enough double columns emited"); - return false; - } - m_message_size += sizeof(double); - table->add_data_double(m_rowdata.double_data[col]); - break; - case INT32: - if (m_rowdata.int32_data.find(col) == m_rowdata.int32_data.end()) { - m_exch->setException("Not enough int32 columns emited"); - return false; - } - m_message_size += sizeof(int32_t); - table->add_data_int32(m_rowdata.int32_data[col]); - break; - case INT64: - if (m_rowdata.int64_data.find(col) == m_rowdata.int64_data.end()) { - m_exch->setException("Not enough int64 columns emited"); - return false; - } - m_message_size += sizeof(int64_t); - table->add_data_int64(m_rowdata.int64_data[col]); - break; - case NUMERIC: - case TIMESTAMP: - case DATE: - case STRING: - if (m_rowdata.string_data.find(col) == m_rowdata.string_data.end()) { - m_exch->setException("Not enough string columns emited"); - return false; - } - m_message_size += sizeof(int32_t) + m_rowdata.string_data[col].length(); - *table->add_data_string() = m_rowdata.string_data[col]; - break; - case BOOLEAN: - if (m_rowdata.bool_data.find(col) == m_rowdata.bool_data.end()) { - m_exch->setException("Not enough boolean columns emited"); - return false; - } - m_message_size += 1; - table->add_data_bool(m_rowdata.bool_data[col]); - break; - default: - m_exch->setException("Unknown data type found"); - return false; - } - } - // add local (input) row number for emitted row - table->add_row_number(m_table_iterator->get_current_row()); - m_rowdata = rowdata_t(); - if (!table->has_rows()) table->set_rows(1); - else table->set_rows(table->rows() + 1); - table->set_rows_in_group(0); - if (m_message_size >= SWIG_MAX_VAR_DATASIZE) { - if (SWIGVM_params->inp_iter_type == EXACTLY_ONCE && SWIGVM_params->out_iter_type == EXACTLY_ONCE) - SWIGVM_params->inp_force_finish = true; - else this->flush(); - } - return true; -#endif - } - - inline void setDouble(unsigned int col, const double v) { - if (col >= m_types.size()) { m_exch->setException("Column does not exist"); return; } - if (m_types[col].type != DOUBLE) { m_exch->setException("Wrong column type (not a double)"); return; } -#ifndef EXTERNAL_PROCESS - m_table.set(col, v); -#else - m_rowdata.null_data[col] = false; - m_rowdata.double_data[col] = v; -#endif - } - - inline void setString(unsigned int col, const char *v, size_t l) { - if (col >= m_types.size()) { m_exch->setException("Column does not exist"); return; } - if (m_types[col].type != STRING) { m_exch->setException("Wrong column type (not a string)"); return; } -#ifndef EXTERNAL_PROCESS - StringData arg; - arg.length = l; - arg.string = alloc_mword_aligned(m_stack, l + 1); - ::memcpy(arg.string, v, l); - arg.string[l] = '\0'; - m_table.set(col, arg, m_types[col].len); -#else - m_rowdata.null_data[col] = false; - m_rowdata.string_data[col] = v; -#endif - } - - inline void setInt32(unsigned int col, const int32_t v) { - if (col >= m_types.size()) { m_exch->setException("Column does not exist"); return; } - if (m_types[col].type != INT32) { m_exch->setException("Wrong column type (not Int32)"); return; } -#ifndef EXTERNAL_PROCESS - m_table.set(col, v); -#else - m_rowdata.null_data[col] = false; - m_rowdata.int32_data[col] = v; -#endif - } - - inline void setInt64(unsigned int col, const int64_t v) { - if (col >= m_types.size()) { m_exch->setException("Column does not exist"); return; } - if (m_types[col].type != INT64) { m_exch->setException("Wrong column type (not Int64)"); return; } -#ifndef EXTERNAL_PROCESS - m_table.set(col, v); -#else - m_rowdata.null_data[col] = false; - m_rowdata.int64_data[col] = v; -#endif - } - - inline void setNumeric(unsigned int col, const char *v) { - if (col >= m_types.size()) { m_exch->setException("Column does not exist"); return; } - if (m_types[col].type != NUMERIC) { m_exch->setException("Wrong column type (not Numeric)"); return; } -#ifndef EXTERNAL_PROCESS - if (m_types[col].len == 4) { - smallint data; - DTM::string_to_numeric(v, strlen(v), data, m_types[col].prec, m_types[col].scale, ".,", true); - m_table.set(col, data); - } else if (m_types[col].len == 8) { - integer data; - DTM::string_to_numeric(v, strlen(v), data, m_types[col].prec, m_types[col].scale, ".,", true); - m_table.set(col, data); - } else if (m_types[col].len == 16) { - int128_t data; - DTM::string_to_numeric(v, strlen(v), data, m_types[col].prec, m_types[col].scale, ".,", true); - m_table.set(col, data); - } else m_exch->setException("Numeric column length not supported"); -#else - m_rowdata.null_data[col] = false; - m_rowdata.string_data[col] = v; -#endif - } - inline void setTimestamp(unsigned int col, const char *v) { - if (col >= m_types.size()) { m_exch->setException("Column does not exist"); return; } - if (m_types[col].type != TIMESTAMP) { m_exch->setException("Wrong column type (not Timestamp)"); return; } -#ifndef EXTERNAL_PROCESS - timestamp_t data = DTM::string_to_timestamp(7, v, strlen(v), "YYYY-MM-DD HH24:MI:SS.FF6", 25); - m_table.set(col, data); -#else - m_rowdata.null_data[col] = false; - m_rowdata.string_data[col] = v; -#endif - } - inline void setDate(unsigned int col, const char *v) { - if (col >= m_types.size()) { m_exch->setException("Column does not exist"); return; } - if (m_types[col].type != DATE) { m_exch->setException("Wrong column type (not Date)"); return; } -#ifndef EXTERNAL_PROCESS - date data = DTM::string_to_date(7, v, strlen(v), "YYYY-MM-DD", 10); - m_table.set(col, data); -#else - m_rowdata.null_data[col] = false; - m_rowdata.string_data[col] = v; -#endif - } - inline void setBoolean(unsigned int col, const bool v) { - if (col >= m_types.size()) { m_exch->setException("Column does not exist"); return; } - if (m_types[col].type != BOOLEAN) { m_exch->setException("Wrong column type (not Boolean)"); return; } -#ifndef EXTERNAL_PROCESS - BooleanData arg = v ? V_TRUE : V_FALSE; - m_table.set(col, arg); -#else - m_rowdata.null_data[col] = false; - m_rowdata.bool_data[col] = v; -#endif - } - inline void setNull(unsigned int col) { -#ifndef EXTERNAL_PROCESS - m_table.setNull(col); -#else - m_rowdata.null_data[col] = true; -#endif - } - -#ifndef EXTERNAL_PROCESS - // random access on input table (used for expressions in select list) - inline bool saveInTableState(size_t& numReads, size_t& cpos) - { - return m_input_table.saveState(numReads, cpos); - } - inline void restoreInTableState(size_t numReads, size_t cpos) - { - return m_input_table.restoreState(numReads, cpos); - } - template - inline bool getInTableData(unsigned int col, uint64_t row, T& v) - { - return m_input_table.getSpecificRowCol(col, row, v); - } - inline row_number inTableNumberOfArguments() - { - return m_input_table.numberOfArguments(); - } - template - inline void setDirect(unsigned int col, T v) - { - if (col >= m_types.size()) { m_exch->setException("Column does not exist"); return; } - m_table.set(col, v); - } - inline void setGeoData(unsigned int col, const char *v, size_t l) { - if (col >= m_types.size()) { m_exch->setException("Column does not exist"); return; } - if (m_types[col].type != GEOMETRY) { m_exch->setException("Wrong column type (not geodata)"); return; } - StringData arg; - arg.length = l; - arg.string = alloc_mword_aligned(m_stack, l + 1); - ::memcpy(arg.string, v, l); - arg.string[l] = '\0'; - m_table.setGeo(col, arg, m_types[col].len); - } -#endif -}; - -class SWIGVM { - public: - struct exception: std::exception { - exception(const char *reason): m_reason(reason) { } - virtual ~exception() throw() { } - - const char* what() const throw() { return m_reason.c_str(); } - private: - std::string m_reason; - }; - - SWIGVM() { } - - virtual ~SWIGVM() { } - - virtual void shutdown() {}; - virtual void destroy() {}; - virtual bool run() = 0; - virtual std::string singleCall(single_call_function_id fn, const ExecutionGraph::ScriptDTO& args) = 0; -}; - - -struct swig_undefined_single_call_exception: public std::exception -{ - swig_undefined_single_call_exception(const std::string& fn): m_fn(fn) { } - virtual ~swig_undefined_single_call_exception() throw() { } - const std::string fn() const {return m_fn;} - const char* what() const throw() { - std::stringstream sb; - sb << "Undefined in UDF: " << m_fn; - return sb.str().c_str(); - } - private: - const std::string m_fn; -}; - - -/* Forward declaretion of implementation classes */ - -$ifdef ENABLE_PYTHON_VM -class PythonVMImpl; -$endif - -$ifdef ENABLE_R_VM -class RVMImpl; -$endif - -#ifndef EXTERNAL_PROCESS -class ZMQVMImpl; -#endif - -$ifdef ENABLE_GUILE_VM -class GuileVMImpl; -$endif - -$ifdef ENABLE_JAVA_VM -class JavaVMImpl; -$endif - -#ifdef EXTERNAL_PROCESS - -$ifdef ENABLE_PYTHON_VM - -class PythonVM: public SWIGVM { - public: - struct exception: SWIGVM::exception { - exception(const char *reason): SWIGVM::exception(reason) { } - virtual ~exception() throw() { } - }; - - PythonVM(bool checkOnly); - virtual ~PythonVM() {}; - virtual void shutdown(); - virtual bool run(); - virtual std::string singleCall(single_call_function_id fn, const ExecutionGraph::ScriptDTO& args); - private: - PythonVMImpl *m_impl; -}; - -$endif -$ifdef ENABLE_R_VM - -class RVM: public SWIGVM { - public: - struct exception: SWIGVM::exception { - exception(const char *reason): SWIGVM::exception(reason) { } - virtual ~exception() throw() { } - }; - - RVM(bool checkOnly); - virtual ~RVM() {}; - virtual bool run(); - virtual void shutdown(); - virtual std::string singleCall(single_call_function_id fn, const ExecutionGraph::ScriptDTO& args); - private: - RVMImpl *m_impl; -}; - -$endif - -$ifdef ENABLE_JAVA_VM -class JavaVMach: public SWIGVM { - public: - struct exception: SWIGVM::exception { - exception(const char *reason): SWIGVM::exception(reason) { } - virtual ~exception() throw() { } - }; - JavaVMach(bool checkOnly); - virtual ~JavaVMach() {} - virtual void shutdown(); - virtual bool run(); - virtual std::string singleCall(single_call_function_id fn, const ExecutionGraph::ScriptDTO& args); - private: - JavaVMImpl *m_impl; -}; -$endif - -#else - -class ZMQConnectionHandler; -ZMQConnectionHandler *create_internal_handler(); -ZMQConnectionHandler *create_external_handler(const std::string &redirector_url); - -class ZMQVM: public SWIGVM { -public: - struct exception: std::exception { - exception(const char *reason): m_reason(reason) { } - virtual ~exception() throw() { } - - const char* what() const throw() { return m_reason.c_str(); } - private: - std::string m_reason; - }; - - ZMQVM(VMTYPE vm_type, ZMQConnectionHandler *handler, bool checkOnly, SWIGVM_params_t ¶ms, - const std::string& chroot_path, - const std::string& exec_path, - const std::vector& ext_mount_paths, - const std::string& query, - const std::vector& nsexec_env_var, - bool isUDFPlugin); -protected: - bool called_destroy; - virtual ~ZMQVM(); -public: - virtual void destroy(); - virtual bool run(); - virtual std::string singleCall(single_call_function_id fn, const ExecutionGraph::ScriptDTO& args); - -private: - ZMQVMImpl *m_impl; -}; - -#endif - -} /* namespace SWIGVMContainers */ -$endif diff --git a/cpp_client/src/zmqcontainer.proto b/cpp_client/src/zmqcontainer.proto deleted file mode 100644 index 816f868bc..000000000 --- a/cpp_client/src/zmqcontainer.proto +++ /dev/null @@ -1,248 +0,0 @@ -// --- data definition --------------------------------------------------------- - -enum column_type { - PB_UNSUPPORTED = 0; // represents following SQL types: - PB_DOUBLE = 1; // FLOAT - PB_INT32 = 2; // DECIMAL(4, 0) - PB_INT64 = 3; // DECIMAL(8, 0) - PB_NUMERIC = 4; // all other numeric types - PB_TIMESTAMP = 5; // TIMESTAMP - PB_DATE = 6; // DATE - PB_STRING = 7; // CHAR or VARCHAR - PB_BOOLEAN = 8; // BOOL -} - -// type of iteration: -// scalar, returns -> PB_EXACTLY_ONCE -// set, emits -> PB_MULTIPLE -enum iter_type { PB_EXACTLY_ONCE = 1; PB_MULTIPLE = 2; }; - -message exascript_metadata { - message column_definition { - // Name of column as UTF-8 String - required string name = 1; - // Type of column (see "enum column_type") - optional column_type type = 2; - // Type name how it is shown in EXASolution - required string type_name = 3; - // Size of Type, for CHAR and VARCHAR types - optional uint32 size = 4; - // Precision, for DECIMAL types - optional uint32 precision = 5; - // Scale, for DECIMAL types - optional uint32 scale = 6; - } - required iter_type input_iter_type = 1; - required iter_type output_iter_type = 2; - - repeated column_definition input_columns = 3; - repeated column_definition output_columns = 4; - - // If set, do not enter the run() state but instead wait for single - // synchronous function invokations - required bool single_call_mode = 5; -} - -message exascript_table_data { - // Rows count in current message - required uint64 rows = 1; - // Rows count in current group in EXASolution - // Can be 0 if no group defined - required uint64 rows_in_group = 8; - - // Storage for following types: NUMERIC, TIMESTAMP, DATE and STRING - repeated string data_string = 2; - - repeated bool data_nulls = 3 [packed = true]; - repeated bool data_bool = 4 [packed = true]; - repeated int32 data_int32 = 5 [packed = true]; - repeated int64 data_int64 = 6 [packed = true]; - repeated double data_double = 7 [packed = true]; - // Local row numbers - repeated uint64 row_number = 9 [packed = true]; -} - -// --- initialization process -------------------------------------------------- - -message exascript_client { - // URL of the client in form: tcp://10.10.1.1:2000 - required string client_name = 1; - // UTF-8 Text, which is received from redirector and send to VM - // This string will not be interpreted by EXASolution - optional string meta_info = 2; -} - -/* -enum exascript_vmtype { - PB_VM_UNSUPPORTED = 0; - PB_VM_PYTHON = 1; - PB_VM_SCHEME = 2; - PB_VM_JAVASCRIPT = 3; - PB_VM_R = 4; - PB_VM_EXTERNAL = 5; - PB_VM_JAVA = 6; - PM_VM_PLUGIN_LANGUAGE = 7; -} -*/ - -message exascript_info { - required string database_name = 1; - required string database_version = 2; - required string script_name = 3; - required string source_code = 4; - // Session ID: long unique number of current session in EXASolution - required uint64 session_id = 5; - // Number of SQL Statement in current Session - required uint32 statement_id = 6; - // Number of nodes in EXASolution - required uint32 node_count = 7; - // Current node number, starting at 0 - required uint32 node_id = 8; - // Long unique number of VM - required uint64 vm_id = 9; - // Type of the VM - //required exascript_vmtype vm_type = 10; - // A VM should not exceed this memory limit - // For external UDF's it doesn't matter - required uint64 maximal_memory_limit = 11; - // meta_info contains the string from meta_info in exascript_client message - optional string meta_info = 12; - required string script_schema = 13; - optional string current_user = 14; - optional string current_schema = 15; -} -message exascript_ping { - // the string from exascript_client message - required string meta_info = 1; -} - -// --- connection finalization ------------------------------------------------- - -message exascript_close { - optional string exception_message = 1; -} - -// --- script and other imports -------------------------------------------------------- - -enum import_type { - PB_IMPORT_SCRIPT_CODE = 1; - PB_IMPORT_CONNECTION_INFORMATION = 2; -} - -message exascript_import_req { - required string script_name = 1; // todo?: rename to "object_name" - optional import_type kind = 2 [default=PB_IMPORT_SCRIPT_CODE]; -} - -message connection_information_rep { - required string kind = 1; - required string address = 2; - required string user = 3; - required string password = 4; -} - -message exascript_import_rep { - optional string source_code = 1; - optional string exception_message = 2; - optional connection_information_rep connection_information = 3; -} - -// --- sending data from exasolution to process -------------------------------- - -message exascript_next_data_rep { - required exascript_table_data table = 2; -} - -// --- sending data from process to exasolution -------------------------------- - -message exascript_emit_data_req { - required exascript_table_data table = 2; -} - -// --- sending a single result from process to exasolution --------------------- -message exascript_return_req { - required string result = 1; -} - -message exascript_undefined_call_req { - required string remote_fn = 1; -} - -enum single_call_function_id { - SC_FN_NIL = 0; - SC_FN_DEFAULT_OUTPUT_COLUMNS = 1; - SC_FN_VIRTUAL_SCHEMA_ADAPTER_CALL = 2; - SC_FN_GENERATE_SQL_FOR_IMPORT_SPEC = 3; -} - - -message key_value_pair { - required string key = 1; - required string value = 2; -} - -message import_specification_rep { - required bool is_subselect = 1; - optional connection_information_rep connection_information = 2; - optional string connection_name = 3; - repeated exascript_metadata.column_definition subselect_column_specification = 4; - repeated key_value_pair parameters = 5; -} - - -message exascript_single_call_rep { - required single_call_function_id fn = 1; - optional import_specification_rep import_specification = 2; - optional string json_arg = 3; // Will be refactored in separate branch, just temporary -} - -// --- general request and response messages ----------------------------------- - -enum message_type { - MT_UNKNOWN = 0; - MT_CLIENT = 1; - MT_INFO = 2; - MT_META = 3; - MT_CLOSE = 4; - MT_IMPORT = 5; - MT_NEXT = 6; - MT_RESET = 7; - MT_EMIT = 8; - MT_RUN = 9; - MT_DONE = 10; - MT_CLEANUP = 11; - MT_FINISHED = 12; - MT_PING_PONG = 13; - MT_TRY_AGAIN = 14; - MT_CALL = 15; // this message request the synchronous invokation of a function in the remote container - MT_RETURN = 16; // this message is used to communicate the result of a synchronous function invokation from the container to EXASolution - MT_UNDEFINED_CALL = 17; // this message tells EXASolution that the requested function in not available in the container -} - -; - -message exascript_request { - required message_type type = 1; - required uint64 connection_id = 2; - optional exascript_client client = 3; /* E-UDF service -> EXASolution */ - optional exascript_info info = 4; /* EXASolution -> Redirector */ - optional exascript_close close = 5; - optional exascript_import_req import = 6; - optional exascript_emit_data_req emit = 7; - optional exascript_ping ping = 8; - optional exascript_return_req call_result = 9; - optional exascript_undefined_call_req undefined_call = 10; -} - -message exascript_response { - required message_type type = 1; - required uint64 connection_id = 2; - optional exascript_client client = 3; /* Redirector -> EXASolution */ - optional exascript_info info = 4; /* EXASolution -> E-UDF service */ - optional exascript_metadata meta = 5; - optional exascript_close close = 6; - optional exascript_import_rep import = 7; - optional exascript_next_data_rep next = 8; - optional exascript_ping ping = 9; - optional exascript_single_call_rep call = 10; -} diff --git a/cpp_client/src/zmqcontainerclient.cc b/cpp_client/src/zmqcontainerclient.cc deleted file mode 100644 index aa5dc52e8..000000000 --- a/cpp_client/src/zmqcontainerclient.cc +++ /dev/null @@ -1,869 +0,0 @@ -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include "scriptDTO.h" - -#ifdef ENABLE_CPP_VM -#include "cpp.h" -#endif - -#ifdef PROTEGRITY_PLUGIN_CLIENT -#include -#endif - -using namespace SWIGVMContainers; -using namespace std; -using namespace google::protobuf; - -__thread SWIGVM_params_t *SWIGVMContainers::SWIGVM_params; - -static string socket_name; -static const char *socket_name_str; -static string output_buffer; -static SWIGVMExceptionHandler exchandler; -static pid_t my_pid; //parent_pid, -//static exascript_vmtype vm_type; -static exascript_request request; -static exascript_response response; - -static string g_database_name; -static string g_database_version; -static string g_script_name; -static string g_script_schema; -static string g_current_user; -static string g_current_schema; -static string g_source_code; -static unsigned long long g_session_id; -static unsigned long g_statement_id; -static unsigned int g_node_count; -static unsigned int g_node_id; -static unsigned long long g_vm_id; -static bool g_singleCallMode; -static single_call_function_id g_singleCallFunction; -static ExecutionGraph::ImportSpecification g_singleCall_ImportSpecificationArg; -static ExecutionGraph::StringDTO g_singleCall_StringArg; -static bool remote_client; - -#ifndef NDEBUG -#define SWIGVM_LOG_CLIENT -#endif -#define SWIGVM_LOG_CLIENT -//#define LOG_COMMUNICATION - -static void external_process_check() -{ - if (remote_client) return; - if (::access(&(socket_name_str[6]), F_OK) != 0) { - ::sleep(1); // give me a chance to die with my parent process - cerr << "exaudfclient aborting ... cannot access socket file " << socket_name_str+6 << "." << endl; -#ifdef SWIGVM_LOG_CLIENT - cerr << "### SWIGVM aborting with name '" << socket_name_str - << "' (" << ::getppid() << ',' << ::getpid() << ')' << endl; -#endif - ::abort(); - } -} - - -static bool keep_checking = true; - -void *check_thread_routine(void* data) -{ - while(keep_checking) { - external_process_check(); - ::usleep(100000); - } - return NULL; - -} - -void SWIGVMContainers::socket_send(zmq::socket_t &socket, zmq::message_t &zmsg) -{ -#ifdef LOG_COMMUNICATION - stringstream sb; - uint32_t len = zmsg.size(); - sb << "/tmp/zmqcomm_log_" << ::getpid() << "_send.data"; - int fd = ::open(sb.str().c_str(), O_CREAT | O_APPEND | O_WRONLY, 00644); - if (fd >= 0) { - ::write(fd, &len, sizeof(uint32_t)); - ::write(fd, zmsg.data(), len); - ::close(fd); - } -#endif - for (;;) { - try { - if (socket.send(zmsg) == true) - return; - external_process_check(); - } catch (std::exception &err) { - external_process_check(); - } catch (...) { - external_process_check(); - } - ::usleep(100000); - } -} - -bool SWIGVMContainers::socket_recv(zmq::socket_t &socket, zmq::message_t &zmsg, bool return_on_error) -{ - for (;;) { - try { - if (socket.recv(&zmsg) == true) { -#ifdef LOG_COMMUNICATION - stringstream sb; - uint32_t len = zmsg.size(); - sb << "/tmp/zmqcomm_log_" << ::getpid() << "_recv.data"; - int fd = ::open(sb.str().c_str(), O_CREAT | O_APPEND | O_WRONLY, 00644); - if (fd >= 0) { - ::write(fd, &len, sizeof(uint32_t)); - ::write(fd, zmsg.data(), len); - ::close(fd); - } -#endif - return true; - } - external_process_check(); - } catch (std::exception &err) { - external_process_check(); - - } catch (...) { - external_process_check(); - } - if (return_on_error) return false; - ::usleep(100000); - } - return false; -} - -static bool send_init(zmq::socket_t &socket, const string client_name) -{ - request.Clear(); - request.set_type(MT_CLIENT); - request.set_connection_id(0); - exascript_client *req = request.mutable_client(); - req->set_client_name(client_name); - if (!request.SerializeToString(&output_buffer)) { - exchandler.setException("Communication error: failed to serialize data"); - return false; - } - zmq::message_t zmsg((void*)output_buffer.c_str(), output_buffer.length(), NULL, NULL); - socket_send(socket, zmsg); - - zmq::message_t zmsgrecv; - response.Clear(); - if (!socket_recv(socket, zmsgrecv, true)) - return false; - if (!response.ParseFromArray(zmsgrecv.data(), zmsgrecv.size())) { - exchandler.setException("Failed to parse data"); - return false; - } - - SWIGVM_params->connection_id = response.connection_id(); -#ifdef SWIGVM_LOG_CLIENT - stringstream sb; sb << std::hex << SWIGVM_params->connection_id; - cerr << "### SWIGVM connected with id " << sb.str() << endl; -#endif - if (response.type() == MT_CLOSE) { - if (response.close().has_exception_message()) - exchandler.setException(response.close().exception_message().c_str()); - else exchandler.setException("Connection closed by server"); - return false; - } - if (response.type() != MT_INFO) { - exchandler.setException("Wrong message type, should be MT_INFO"); - return false; - } - const exascript_info &rep = response.info(); - g_database_name = rep.database_name(); - g_database_version = rep.database_version(); - g_script_name = rep.script_name(); - g_script_schema = rep.script_schema(); - g_current_user = rep.current_user(); - g_current_schema = rep.current_schema(); - g_source_code = rep.source_code(); - g_session_id = rep.session_id(); - g_statement_id = rep.statement_id(); - g_node_count = rep.node_count(); - g_node_id = rep.node_id(); - g_vm_id = rep.vm_id(); - //vm_type = rep.vm_type(); - - - SWIGVM_params->maximal_memory_limit = rep.maximal_memory_limit(); - struct rlimit d; - d.rlim_cur = d.rlim_max = rep.maximal_memory_limit(); - if (setrlimit(RLIMIT_RSS, &d) != 0) -#ifdef SWIGVM_LOG_CLIENT - cerr << "WARNING: Failed to set memory limit" << endl; -#else - throw SWIGVM::exception("Failed to set memory limit"); -#endif - d.rlim_cur = d.rlim_max = 0; // 0 for no core dumps, RLIM_INFINITY to enable coredumps of any size - if (setrlimit(RLIMIT_CORE, &d) != 0) -#ifdef SWIGVM_LOG_CLIENT - cerr << "WARNING: Failed to set core limit" << endl; -#else - throw SWIGVM::exception("Failed to set core limit"); -#endif - /* d.rlim_cur = d.rlim_max = 65536; */ - getrlimit(RLIMIT_NOFILE,&d); - if (d.rlim_max < 32768) - { -//#ifdef SWIGVM_LOG_CLIENT - cerr << "WARNING: Reducing RLIMIT_NOFILE below 32768" << endl; -//#endif - } - d.rlim_cur = d.rlim_max = std::min(32768,(int)d.rlim_max); - if (setrlimit(RLIMIT_NOFILE, &d) != 0) -#ifdef SWIGVM_LOG_CLIENT - cerr << "WARNING: Failed to set nofile limit" << endl; -#else - throw SWIGVM::exception("Failed to set nofile limit"); -#endif - d.rlim_cur = d.rlim_max = 32768; - if (setrlimit(RLIMIT_NPROC, &d) != 0) - { - cerr << "WARNING: Failed to set nproc limit to 32k trying 8k ..." << endl; - d.rlim_cur = d.rlim_max = 8192; - if (setrlimit(RLIMIT_NPROC, &d) != 0) -#ifdef SWIGVM_LOG_CLIENT - cerr << "WARNING: Failed to set nproc limit" << endl; -#else - throw SWIGVM::exception("Failed to set nproc limit"); -#endif - } - - { /* send meta request */ - request.Clear(); - request.set_type(MT_META); - request.set_connection_id(SWIGVM_params->connection_id); - if (!request.SerializeToString(&output_buffer)) { - exchandler.setException("Communication error: failed to serialize data"); - return false; - } - zmq::message_t zmsg((void*)output_buffer.c_str(), output_buffer.length(), NULL, NULL); - socket_send(socket, zmsg); - } /* receive meta response */ - { zmq::message_t zmsg; - socket_recv(socket, zmsg); - response.Clear(); - if (!response.ParseFromArray(zmsg.data(), zmsg.size())) { - exchandler.setException("Communication error: failed to parse data"); - return false; - } - if (response.type() == MT_CLOSE) { - if (response.close().has_exception_message()) - exchandler.setException(response.close().exception_message().c_str()); - else exchandler.setException("Connection closed by server"); - return false; - } - if (response.type() != MT_META) { - exchandler.setException("Wrong message type, should be META"); - return false; - } - const exascript_metadata &rep = response.meta(); - g_singleCallMode = rep.single_call_mode(); - SWIGVM_params->inp_iter_type = (SWIGVM_itertype_e)(rep.input_iter_type()); - SWIGVM_params->out_iter_type = (SWIGVM_itertype_e)(rep.output_iter_type()); - for (int col = 0; col < rep.input_columns_size(); ++col) { - const exascript_metadata_column_definition &coldef = rep.input_columns(col); - SWIGVM_params->inp_names->push_back(coldef.name()); - SWIGVM_params->inp_types->push_back(SWIGVM_columntype_t()); - SWIGVM_columntype_t &coltype = SWIGVM_params->inp_types->back(); - coltype.len = 0; coltype.prec = 0; coltype.scale = 0; - coltype.type_name = coldef.type_name(); - switch (coldef.type()) { - case PB_UNSUPPORTED: - exchandler.setException("Unsupported column type found"); - return false; - case PB_DOUBLE: - coltype.type = DOUBLE; - break; - case PB_INT32: - coltype.type = INT32; - coltype.prec = coldef.precision(); - coltype.scale = coldef.scale(); - break; - case PB_INT64: - coltype.type = INT64; - coltype.prec = coldef.precision(); - coltype.scale = coldef.scale(); - break; - case PB_NUMERIC: - coltype.type = NUMERIC; - coltype.prec = coldef.precision(); - coltype.scale = coldef.scale(); - break; - case PB_TIMESTAMP: - coltype.type = TIMESTAMP; - break; - case PB_DATE: - coltype.type = DATE; - break; - case PB_STRING: - coltype.type = STRING; - coltype.len = coldef.size(); - break; - case PB_BOOLEAN: - coltype.type = BOOLEAN; - break; - default: - exchandler.setException("Unknown column type found"); - return false; - } - } - for (int col = 0; col < rep.output_columns_size(); ++col) { - const exascript_metadata_column_definition &coldef = rep.output_columns(col); - SWIGVM_params->out_names->push_back(coldef.name()); - SWIGVM_params->out_types->push_back(SWIGVM_columntype_t()); - SWIGVM_columntype_t &coltype = SWIGVM_params->out_types->back(); - coltype.len = 0; coltype.prec = 0; coltype.scale = 0; - coltype.type_name = coldef.type_name(); - switch (coldef.type()) { - case PB_UNSUPPORTED: - exchandler.setException("Unsupported column type found"); - return false; - case PB_DOUBLE: - coltype.type = DOUBLE; - break; - case PB_INT32: - coltype.type = INT32; - coltype.prec = coldef.precision(); - coltype.scale = coldef.scale(); - break; - case PB_INT64: - coltype.type = INT64; - coltype.prec = coldef.precision(); - coltype.scale = coldef.scale(); - break; - case PB_NUMERIC: - coltype.type = NUMERIC; - coltype.prec = coldef.precision(); - coltype.scale = coldef.scale(); - break; - case PB_TIMESTAMP: - coltype.type = TIMESTAMP; - break; - case PB_DATE: - coltype.type = DATE; - break; - case PB_STRING: - coltype.type = STRING; - coltype.len = coldef.size(); - break; - case PB_BOOLEAN: - coltype.type = BOOLEAN; - break; - default: - exchandler.setException("Unknown column type found"); - return false; - } - } - } - return true; -} - -static void send_close(zmq::socket_t &socket, const string &exmsg) -{ - request.Clear(); - request.set_type(MT_CLOSE); - request.set_connection_id(SWIGVM_params->connection_id); - exascript_close *req = request.mutable_close(); - if (exmsg != "") req->set_exception_message(exmsg); - request.SerializeToString(&output_buffer); - zmq::message_t zmsg((void*)output_buffer.c_str(), output_buffer.length(), NULL, NULL); - socket_send(socket, zmsg); - - { /* receive finished response, so we know that the DB knows that we are going to close and - all potential exceptions have been received on DB side */ - zmq::message_t zmsg; - socket_recv(socket, zmsg); - response.Clear(); - if(!response.ParseFromArray(zmsg.data(), zmsg.size())) - throw SWIGVM::exception("Communication error: failed to parse data"); - else if (response.type() != MT_FINISHED) - throw SWIGVM::exception("Wrong response type, should be finished"); - } -} - -static bool send_run(zmq::socket_t &socket) -{ - { - /* send done request */ - request.Clear(); - request.set_type(MT_RUN); - request.set_connection_id(SWIGVM_params->connection_id); - if (!request.SerializeToString(&output_buffer)) - { - throw SWIGVM::exception("Communication error: failed to serialize data"); - } - zmq::message_t zmsg((void*)output_buffer.c_str(), output_buffer.length(), NULL, NULL); - socket_send(socket, zmsg); - } { /* receive done response */ - zmq::message_t zmsg; - socket_recv(socket, zmsg); - response.Clear(); - if (!response.ParseFromArray(zmsg.data(), zmsg.size())) - throw SWIGVM::exception("Communication error: failed to parse data"); - if (response.type() == MT_CLOSE) { - if (response.close().has_exception_message()) - throw SWIGVM::exception(response.close().exception_message().c_str()); - throw SWIGVM::exception("Wrong response type, got empty close response"); - } else if (response.type() == MT_CLEANUP) { - return false; - } else if (g_singleCallMode && response.type() == MT_CALL) { - assert(g_singleCallMode); - exascript_single_call_rep sc = response.call(); - g_singleCallFunction = sc.fn(); - - switch (g_singleCallFunction) - { - case SC_FN_NIL: - case SC_FN_DEFAULT_OUTPUT_COLUMNS: - break; - case SC_FN_GENERATE_SQL_FOR_IMPORT_SPEC: - { - - if (!sc.has_import_specification()) - { - throw SWIGVM::exception("internal error: SC_FN_GENERATE_SQL_FOR_IMPORT_SPEC without import specification"); - } - const import_specification_rep& is_proto = sc.import_specification(); - g_singleCall_ImportSpecificationArg = ExecutionGraph::ImportSpecification(is_proto.is_subselect()); - if (is_proto.has_connection_information()) - { - const connection_information_rep& ci_proto = is_proto.connection_information(); - ExecutionGraph::ConnectionInformation connection_info(ci_proto.kind(), ci_proto.address(), ci_proto.user(), ci_proto.password()); - g_singleCall_ImportSpecificationArg.setConnectionInformation(connection_info); - } - if (is_proto.has_connection_name()) - { - g_singleCall_ImportSpecificationArg.setConnectionName(is_proto.connection_name()); - } - for (int i=0; iset_result(result.c_str()); - request.set_allocated_call_result(rr); - request.set_connection_id(SWIGVM_params->connection_id); - if (!request.SerializeToString(&output_buffer)) - throw SWIGVM::exception("Communication error: failed to serialize data"); - zmq::message_t zmsg((void*)output_buffer.c_str(), output_buffer.length(), NULL, NULL); - socket_send(socket, zmsg); - } { /* receive return response */ - zmq::message_t zmsg; - socket_recv(socket, zmsg); - response.Clear(); - if (!response.ParseFromArray(zmsg.data(), zmsg.size())) - throw SWIGVM::exception("Communication error: failed to parse data"); - if (response.type() == MT_CLOSE) { - if (response.close().has_exception_message()) - throw SWIGVM::exception(response.close().exception_message().c_str()); - throw SWIGVM::exception("Wrong response type, got empty close response"); - } else if (response.type() == MT_CLEANUP) { - return false; - } else if (response.type() != MT_RETURN) { - throw SWIGVM::exception("Wrong response type, should be MT_RETURN"); - } - } - return true; -} - -static void send_undefined_call(zmq::socket_t &socket, const std::string& fn) -{ - { /* send return request */ - request.Clear(); - request.set_type(MT_UNDEFINED_CALL); - ::exascript_undefined_call_req* uc = new ::exascript_undefined_call_req(); - uc->set_remote_fn(fn); - request.set_allocated_undefined_call(uc); - request.set_connection_id(SWIGVM_params->connection_id); - if (!request.SerializeToString(&output_buffer)) - throw SWIGVM::exception("Communication error: failed to serialize data"); - zmq::message_t zmsg((void*)output_buffer.c_str(), output_buffer.length(), NULL, NULL); - socket_send(socket, zmsg); - } { /* receive return response */ - zmq::message_t zmsg; - socket_recv(socket, zmsg); - response.Clear(); - if (!response.ParseFromArray(zmsg.data(), zmsg.size())) - throw SWIGVM::exception("Communication error: failed to parse data"); - if (response.type() != MT_UNDEFINED_CALL) { - throw SWIGVM::exception("Wrong response type, should be MT_UNDEFINED_CALL"); - } - } -} - - -static bool send_done(zmq::socket_t &socket) -{ - { /* send done request */ - request.Clear(); - request.set_type(MT_DONE); - request.set_connection_id(SWIGVM_params->connection_id); - if (!request.SerializeToString(&output_buffer)) - throw SWIGVM::exception("Communication error: failed to serialize data"); - zmq::message_t zmsg((void*)output_buffer.c_str(), output_buffer.length(), NULL, NULL); - socket_send(socket, zmsg); - } { /* receive done response */ - zmq::message_t zmsg; - socket_recv(socket, zmsg); - response.Clear(); - if (!response.ParseFromArray(zmsg.data(), zmsg.size())) - throw SWIGVM::exception("Communication error: failed to parse data"); - if (response.type() == MT_CLOSE) { - if (response.close().has_exception_message()) - throw SWIGVM::exception(response.close().exception_message().c_str()); - throw SWIGVM::exception("Wrong response type, got empty close response"); - } else if (response.type() == MT_CLEANUP) { - return false; - } else if (response.type() != MT_DONE) - throw SWIGVM::exception("Wrong response type, should be done"); - } - return true; -} - -static void send_finished(zmq::socket_t &socket) -{ - { /* send done request */ - request.Clear(); - request.set_type(MT_FINISHED); - request.set_connection_id(SWIGVM_params->connection_id); - if (!request.SerializeToString(&output_buffer)) - throw SWIGVM::exception("Communication error: failed to serialize data"); - zmq::message_t zmsg((void*)output_buffer.c_str(), output_buffer.length(), NULL, NULL); - socket_send(socket, zmsg); - } { /* receive done response */ - zmq::message_t zmsg; - socket_recv(socket, zmsg); - response.Clear(); - if(!response.ParseFromArray(zmsg.data(), zmsg.size())) - throw SWIGVM::exception("Communication error: failed to parse data"); - if (response.type() == MT_CLOSE) { - if (response.close().has_exception_message()) - throw SWIGVM::exception(response.close().exception_message().c_str()); - throw SWIGVM::exception("Wrong response type, got empty close response"); - } else if (response.type() != MT_FINISHED) - throw SWIGVM::exception("Wrong response type, should be finished"); - } -} - -int main(int argc, char **argv) { -#ifdef PROTEGRITY_PLUGIN_CLIENT - if (argc != 2) { - cerr << "Usage: " << argv[0] << " " << endl; - return 1; - } -#else - if (argc != 3) { - cerr << "Usage: " << argv[0] << " lang=python|lang=r|lang=java" << endl; - return 1; - } -#endif - - if (::setenv("HOME", "/tmp", 1) == -1) - { - throw SWIGVM::exception("Failed to set HOME directory"); - } - ::setlocale(LC_ALL, "en_US.utf8"); - - -#ifdef PROTEGRITY_PLUGIN_CLIENT - stringstream socket_name_ss; -#endif - socket_name = argv[1]; - socket_name_str = argv[1]; - const char *socket_name_file = argv[1]; - - remote_client = false; - my_pid = ::getpid(); - SWIGVM_params = new SWIGVM_params_t(true); - zmq::context_t context(1); - -#ifdef SWIGVM_LOG_CLIENT - for (int i = 0; i 6 && strncmp(socket_name_str, "ipc:", 4) == 0) - { -#ifdef PROTEGRITY_PLUGIN_CLIENT - if (strncmp(socket_name_str, "ipc:///tmp/", 11) == 0) { - socket_name_ss << "ipc://" << getenv("NSEXEC_TMP_PATH") << '/' << &(socket_name_file[11]); - socket_name = socket_name_ss.str(); - socket_name_str = strdup(socket_name_ss.str().c_str()); - socket_name_file = socket_name_str; - } -#endif - socket_name_file = &(socket_name_file[6]); - } - -#ifdef SWIGVM_LOG_CLIENT - cerr << "### SWIGVM starting " << argv[0] << " with name '" << socket_name - << " (" << ::getppid() << ',' << ::getpid() << "): '" - << argv[1] - << '\'' << endl; -#endif - pthread_t check_thread; - if (!remote_client) - pthread_create(&check_thread, NULL, check_thread_routine, NULL); - - int linger_timeout = 0; - int recv_sock_timeout = 1000; - int send_sock_timeout = 1000; - - if (remote_client) { - recv_sock_timeout = 10000; - send_sock_timeout = 5000; - } - -reinit: - zmq::socket_t socket(context, ZMQ_REQ); - - socket.setsockopt(ZMQ_LINGER, &linger_timeout, sizeof(linger_timeout)); - socket.setsockopt(ZMQ_RCVTIMEO, &recv_sock_timeout, sizeof(recv_sock_timeout)); - socket.setsockopt(ZMQ_SNDTIMEO, &send_sock_timeout, sizeof(send_sock_timeout)); - - if (remote_client) socket.bind(socket_name_str); - else socket.connect(socket_name_str); - - SWIGVM_params->sock = &socket; - SWIGVM_params->exch = &exchandler; - - if (!send_init(socket, socket_name)) { - if (!remote_client && exchandler.exthrowed) { - send_close(socket, exchandler.exmsg); - return 1; - } - goto reinit; - } - - SWIGVM_params->dbname = (char*) g_database_name.c_str(); - SWIGVM_params->dbversion = (char*) g_database_version.c_str(); - SWIGVM_params->script_name = (char*) g_script_name.c_str(); - SWIGVM_params->script_schema = (char*) g_script_schema.c_str(); - SWIGVM_params->current_user = (char*) g_current_user.c_str(); - SWIGVM_params->current_schema = (char*) g_current_schema.c_str(); - SWIGVM_params->script_code = (char*) g_source_code.c_str(); - SWIGVM_params->session_id = g_session_id; - SWIGVM_params->statement_id = g_statement_id; - SWIGVM_params->node_count = g_node_count; - SWIGVM_params->node_id = g_node_id; - SWIGVM_params->vm_id = g_vm_id; - SWIGVM_params->singleCallMode = g_singleCallMode; - - SWIGVM *vm = NULL; - try { -#ifdef PROTEGRITY_PLUGIN_CLIENT - vm = new Protegrity(false); -#else - if (strcmp(argv[2], "lang=python")==0) - { -#ifdef ENABLE_PYTHON_VM - vm = new PythonVM(false); -#else - send_close(socket, "Unknown or unsupported VM type"); - return 1; -#endif - } else if (strcmp(argv[2], "lang=r")==0) - { -#ifdef ENABLE_R_VM - vm = new RVM(false); -#else - send_close(socket, "Unknown or unsupported VM type"); - return 1; -#endif - } else if (strcmp(argv[2], "lang=java")==0) - { -#ifdef ENABLE_JAVA_VM - vm = new JavaVMach(false); -#else - send_close(socket, "Unknown or unsupported VM type"); - return 1; -#endif - } else if (strcmp(argv[2], "lang=cpp")==0) - { -#ifdef ENABLE_CPP_VM - vm = new CPPVM(false); -#else - send_close(socket, "Unknown or unsupported VM type: CPP"); - return 1; -#endif - } else { - send_close(socket, "Unknown or unsupported VM type"); - return 1; - } -#endif - - if (g_singleCallMode) { - ExecutionGraph::EmptyDTO noArg; // used as dummy arg - for (;;) { - // in single call mode, after MT_RUN from the client, - // EXASolution responds with a CALL message that specifies - // the single call function to be made - if (!send_run(socket)) {break;} - assert(g_singleCallFunction != SC_FN_NIL); - try { - std::string result; - switch (g_singleCallFunction) - { - case SC_FN_NIL: - break; - case SC_FN_DEFAULT_OUTPUT_COLUMNS: - result = vm->singleCall(g_singleCallFunction,noArg); - break; - case SC_FN_GENERATE_SQL_FOR_IMPORT_SPEC: - assert(!g_singleCall_ImportSpecificationArg.isEmpty()); - result = vm->singleCall(g_singleCallFunction,g_singleCall_ImportSpecificationArg); - g_singleCall_ImportSpecificationArg = ExecutionGraph::ImportSpecification(); // delete the last argument - break; - case SC_FN_VIRTUAL_SCHEMA_ADAPTER_CALL: - assert(!g_singleCall_StringArg.isEmpty()); - result = vm->singleCall(g_singleCallFunction,g_singleCall_StringArg); - break; - } - send_return(socket,result); - if (!send_done(socket)) { - break; - } - } catch (const swig_undefined_single_call_exception& ex) { - send_undefined_call(socket,ex.fn()); - } - } - } else { - for(;;) { - if (!send_run(socket)) - { - break; - } - SWIGVM_params->inp_force_finish = false; - while(!vm->run()) { - } - if (!send_done(socket)) - { - break; - } - } - } - if (vm) - { - vm->shutdown(); - delete vm; - vm = NULL; - } - send_finished(socket); - std::cerr << "stm652: sent finish" << std::endl; - } catch (std::exception &err) { - send_close(socket, err.what()); socket.close(); -#ifdef SWIGVM_LOG_CLIENT - cerr << "### SWIGVM crashing with name '" << socket_name - << " (" << ::getppid() << ',' << ::getpid() << "): " << err.what() << endl; -#endif - goto error; - } catch (...) { - send_close(socket, "Internal/Unknown error throwed"); socket.close(); -#ifdef SWIGVM_LOG_CLIENT - cerr << "### SWIGVM crashing with name '" << socket_name - << " (" << ::getppid() << ',' << ::getpid() << ')' << endl; -#endif - goto error; - } -#ifdef SWIGVM_LOG_CLIENT - cerr << "### SWIGVM finishing with name '" << socket_name - << " (" << ::getppid() << ',' << ::getpid() << ')' << endl; -#endif - keep_checking = false; - socket.close(); - if (!remote_client) { - ::pthread_cancel(check_thread); - ::unlink(socket_name_file); - } - return 0; - -error: - keep_checking = false; - if (vm != NULL) - { - vm->shutdown(); - delete vm; - vm = NULL; - } - - socket.close(); - if (!remote_client) { - ::pthread_cancel(check_thread); - ::unlink(socket_name_file); - } else { - ::sleep(3); // give other components time to shutdown - } - return 1; -} diff --git a/python_client/Makefile b/python_client/Makefile deleted file mode 100644 index 0ec3e220c..000000000 --- a/python_client/Makefile +++ /dev/null @@ -1,19 +0,0 @@ -#!/bin/sh - - -all: pythonclient.tar.gz - -script_client.proto: - wget https://raw.githubusercontent.com/EXASOL/script-languages/master/src/zmqcontainer.proto - -script_client_pb2.py: script_client.proto - protoc -I. $< --python_out=. - - -pythonclient.tar.gz: script_client_pb2.py common_client.py - tar -zcf pythonclient.tar.gz common_client.py python2 python3 script_client_pb2.py - -.PHONY: clean -clean: - @rm -f script_client_pb2.py pythonclient.tar.gz - diff --git a/python_client/README.md b/python_client/README.md deleted file mode 100644 index faa1560f9..000000000 --- a/python_client/README.md +++ /dev/null @@ -1,98 +0,0 @@ -# Python2 and Python3 Client - -## Overview - -This script language client that implements Python2 and Python3 as a -language for EXASOL. It is implemented in pure Python and uses the -same source code for both languages. Users can provide Python code in -CREATE SCRIPT statements. The code is compiled on the fly and then -executed. The client is organized like this: The main folder contains -`common_client.py` which implements the actual language client. The -subfolders `python2` and `python3` include small wrapper scripts which -invoke the appropriate Python interpreter and then import the common -code. - -## Building the client - -The client depends on the existence of Python version of the Protobuf -code in `../script_client.proto`, which will be downloaded and -archived together with the actual client. - -We recommend to build the client in the same Linux container that it - is later run in. For this purpose, we can either build one ourselfs - from scratch like described in the documentation about the EXASOL - Linux container for script languages, or we simply import the one the - is installed in your version of EXASOL. For instance, if you have - EXASOL in a virtual machine with a local ip addree 192.168.56.104 on - your development computer and have configured default BucketFS to - listen to port 2580 via EXAoperation, the pre-installed Linux - container can be imported as follows: - - -``` -docker import http://192.168.56.104:2580/default/EXAClusterOS/ScriptLanguages-6.0.0.tar.gz mydockname -``` - -Now we start the container and share the folder `py` like this: - -``` -docker run -v `pwd`/python_client:/py --name=mydockname -it mydockname /bin/bash -``` - -Now we can build the client by typing `make` in the `/py` folder: - -``` -$ docker run -v `pwd`/python_client:/py --name=sl02 -it sl02 /bin/bash -root@d76fab66fd20:/# cd py/ -root@d76fab66fd20:/py# make -wget https://raw.githubusercontent.com/EXASOL/script-languages/master/script_client.proto ---2016-10-14 20:36:13-- https://raw.githubusercontent.com/EXASOL/script-languages/master/script_client.proto -Resolving raw.githubusercontent.com (raw.githubusercontent.com)... 151.101.60.133 -Connecting to raw.githubusercontent.com (raw.githubusercontent.com)|151.101.60.133|:443... connected. -HTTP request sent, awaiting response... 200 OK -Length: 8011 (7.8K) [text/plain] -Saving to: 'script_client.proto' - -script_client.proto 100%[=========================================================>] 7.82K --.-KB/s in 0s - -2016-10-14 20:36:13 (48.3 MB/s) - 'script_client.proto' saved [8011/8011] - -protoc -I. script_client.proto --python_out=. -tar -zcf pythonclient.tar.gz common_client.py python2 python3 script_client_pb2.py -``` - - -## Deploying the client - -After building the client, exit docker and upload the client into BucketFS. Here we assume that you have created a bucket named `py` (with write-password `writepw`) in the default BucketFS: - -``` -curl -vX PUT -T pythonclient.tar.gz http://w:writepw@192.168.56.104:2580/py/pythonclient.tar.gz -``` - -Finally, in order to use our new version of Python2 and Python3 in SQL, you need to inform the SQL compiler about the new languages. To do so in your current SQL session, you need to modify the session/system parameter SCRIPT_LANGUAGES, for instance like this: - -``` -alter session set script_languages = 'PYTHON=builtin_python R=builtin_r JAVA=builtin_java PY2=localzmq+protobuf:///bfsdefault/default/EXAClusterOS/ScriptLanguages-6.0.0#buckets/bfsdefault/py/pythonclient/python2/client PY3=localzmq+protobuf:///bfsdefault/default/EXAClusterOS/ScriptLanguages-6.0.0#buckets/bfsdefault/py/pythonclient/python3/client'; -``` - - -Note:If you are using `alter session`, you need to re-issue the command above when you start a new session. - -## Example -Now Python2 in pure Python and Python3 is available as script language: - -``` -create or replace PY2 scalar script p2() returns double as -from math import pi -def run(c): - return pi * 2 -/ - -create or replace PY3 scalar script p3() returns double as -import math -def run(c): - return math.pi * 3 -/ - -``` \ No newline at end of file diff --git a/python_client/common_client.py b/python_client/common_client.py deleted file mode 100755 index 9feaecc66..000000000 --- a/python_client/common_client.py +++ /dev/null @@ -1,577 +0,0 @@ -import traceback, sys,os - -#################################################################################### -import zmq, decimal, datetime, imp -import datetime -import script_client_pb2 as z - -if sys.version_info > (3,): - global unicode - unicode = str - - - - -def tprint(*msg): - current_time = datetime.datetime.now() - print(current_time.isoformat(), msg) - -MAX_DATASIZE = 6000000 - -class Comm: - """ General communication function - - Connects to EXASolution with given client_name and works as - callable object, to send requests and receive - responses. Request and response objects are cleared - automatically on each call, after sending request and before - receiving response. - """ - - def __init__(self, client_name): - """This function estabilish the connection to EXASolution""" - self.z = z - self.connection_id = 0 - self.req = z.exascript_request() - self.rep = z.exascript_response() - self.client_name = client_name - self.zcontext = zmq.Context() - self.zsocket = self.zcontext.socket(zmq.REQ) - if self.client_name.startswith('tcp://'): - self.zsocket.bind(self.client_name) - elif self.client_name.startswith('unix://'): - self.zsocket.connect(self.client_name) - elif self.client_name.startswith('ipc://'): - self.zsocket.connect(self.client_name) - else: raise RuntimeError("Unsupported protocol, supported are only ipc://, tcp://, and unix://") - - - def __call__(self, req_type, rep_type = None, req = None, rep = None): - """ Communication functionality - Send the request of given type and receive response of - given type(s). If no req or rep is given, use internal - objects. - """ - z = self.z - if req == None: req = self.req - if rep == None: rep = self.rep - req.type = req_type - req.connection_id = self.connection_id - self.zsocket.send(req.SerializeToString()) - req.Clear(); rep.Clear() - n = lambda x: x != None and z._MESSAGE_TYPE.values[x].name or "None" - comm_desc = "COMM %s - %s->%s:" % \ - (self.connection_id, - n(req_type), - " ".join(tuple(n(x) for x in type(rep_type) == tuple and rep_type or (rep_type,)))) - if rep_type is None: - print(comm_desc, "(without response)") - return None - rep.ParseFromString(self.zsocket.recv()) - if self.rep.type == z.MT_CLOSE: - print(comm_desc, "-> CLOSE:", rep.close.exception_message) - raise RuntimeError("Error: " + rep.close.exception_message) - if type(rep_type) in (list, tuple): - if rep.type not in rep_type: - print(comm_desc, "-> unexpected response:", repr(rep_type)) - raise RuntimeError("Unexpected message: " + repr(rep.type) + " not in " + repr(rep_type)) - elif rep.type != rep_type: - print(comm_desc, "-> unexpected response:", repr(rep_type)) - raise RuntimeError("Unexpected message: " + repr(rep.type) + " not in " + repr(rep_type)) - if self.connection_id != 0 and rep.connection_id != self.connection_id: - print(comm_desc, "-> wrong connection:", self.connection_id, "!=", rep.connection_id) - raise RuntimeError("Received wrong connection ID") - #print comm_desc, n(rep.type), "OK" - - - - -class Exa: - """ "exa" object - Initialy parses the meta information and present it to the - user exactly in the same way as in EXASolution internal UDF. - """ - - def __init__(self, comm): - self._z = comm.z - self._comm = comm - #self._comm.req.client = {} - self._comm.req.client.client_name = comm.client_name - self._comm.req.client.meta_info = '' - self._comm(z.MT_CLIENT, z.MT_INFO) - self._comm.connection_id = self._comm.rep.connection_id - - class exameta: pass - mo = exameta() - mo.database_name = self._comm.rep.info.database_name - mo.database_version = self._comm.rep.info.database_version - mo.script_name = self._comm.rep.info.script_name - mo.script_code = self._comm.rep.info.source_code - mo.script_schema = self._comm.rep.info.script_schema - mo.current_user = self._comm.rep.info.current_user - - print('SOURCE CODE: ', self._comm.rep.info.source_code) - - mo.script_language = "Python" - mo.session_id = self._comm.rep.info.session_id - mo.statement_id = self._comm.rep.info.statement_id - mo.node_count = self._comm.rep.info.node_count - mo.node_id = self._comm.rep.info.node_id - mo.vm_id = self._comm.rep.info.vm_id - self._meta_info = self._comm.rep.info.meta_info - - cfg_input_columns = [] - cfg_output_columns = [] - self._comm(z.MT_META, z.MT_META) - - self._single_call_mode = self._comm.rep.meta.single_call_mode - - if self._comm.rep.meta.input_iter_type == z.PB_EXACTLY_ONCE: - cfg_input_type = 'SCALAR' - elif self._comm.rep.meta.input_iter_type == z.PB_MULTIPLE: - cfg_input_type = 'SET' - else: raise RuntimeError("Unkown input iteration type") - - if self._comm.rep.meta.output_iter_type == z.PB_EXACTLY_ONCE: - cfg_output_type = 'RETURN' - elif self._comm.rep.meta.output_iter_type == z.PB_MULTIPLE: - cfg_output_type = 'EMIT' - else: raise RuntimeError("Unkown output iteration type") - - self._input_type = cfg_input_type - self._output_type = cfg_output_type - - for o, i in [(cfg_input_columns, self._comm.rep.meta.input_columns), - (cfg_output_columns, self._comm.rep.meta.output_columns)]: - for col in i: - info = { - 'name' : col.name, - 'pbtype' : col.type, - 'sqltype' : col.type_name, - 'length' : col.size, - 'precision' : col.precision, - 'scale' : col.scale, - } - if col.type == z.PB_DOUBLE: info['type'] = float - elif col.type == z.PB_INT32: info['type'] = int - elif col.type == z.PB_INT64: info['type'] = int - elif col.type == z.PB_NUMERIC: info['type'] = decimal.Decimal - elif col.type == z.PB_TIMESTAMP: info['type'] = datetime.datetime - elif col.type == z.PB_DATE: info['type'] = datetime.date - elif col.type == z.PB_STRING: info['type'] = unicode - elif col.type == z.PB_BOOLEAN: info['type'] = bool - else: raise RuntimeError("Received unknown column type") - if col.type in (z.PB_DOUBLE, z.PB_INT32, z.PB_INT64, z.PB_NUMERIC): - info['length']= None - o.append(info) - self._input_columns = cfg_input_columns - self._output_columns = cfg_output_columns - - mo.input_type = cfg_input_type - mo.output_type = cfg_output_type - mo.input_column_count = len(cfg_input_columns) - mo.output_column_count = len(cfg_output_columns) - class exacolumn: - def __init__(self, d): - self.name = d['name'] - self.type = d['type'] - self.sql_type = d['sqltype'] - self.precision = d.get('precision', None) - self.scale = d.get('scale', None) - self.length = d.get('length', None) - def __repr__(self): - return "" % tuple(repr(x) for x in ( - self.name, self.type, self.sql_type, self.precision, self.scale, self.length)) - mo.input_columns = [exacolumn(a) for a in cfg_input_columns] - mo.output_columns = [exacolumn(a) for a in cfg_output_columns] - self.meta = mo - self.__modules = {} - - def import_script(self, script): - """ Import script from the EXASolution database """ - z = self._z - getattr(self._comm.req, 'import').script_name = script - try: - self._comm(z.MT_IMPORT, z.MT_IMPORT) - if getattr(self._comm.rep, 'import').exception_message != '': - message = getattr(self._comm.rep, 'import').exception_message - raise RuntimeError(message) - code = getattr(self._comm.rep, 'import').source_code - except Exception as err: - raise ImportError(u'importing script %s failed: %s' % (script, str(err))) - print("IMPORT", self._comm.connection_id, repr(code), "cache", repr(self), repr(self.__modules)) - if self.__modules.has_key(code): - return self.__modules[code] - obj = imp.new_module(script) - obj.__file__ = '<%s>' % script - obj.__dict__['exa'] = self - self.__modules[code] = obj - try: exec(compile(code, script, 'exec'),obj.__dict__) - except Exception as err: - raise ImportError(u'importing module %s failed: %s' % (script, str(err))) - return obj - -class Exaiter: - """ Data iterator - Allows to iterate through the input data. Initially creates - functions to properly convert input data to Python objects. - """ - - def __init__(self, exa): - comm = exa._comm; z = comm.z - self.__comm = comm - self.__z = z - self.__input = z.exascript_response() - self.__output = z.exascript_request() - self.__row_num = 0 - self.__finished = False - self.__colnums = {} - - # define the read_row function, which reads a full row from - # received data - def convert_date(x): - val = datetime.datetime.strptime(x, "%Y-%m-%d") - return datetime.date(val.year, val.month, val.day) - def convert_timestamp(x): - return datetime.datetime.strptime(x, "%Y-%m-%d %H:%M:%S.%f") - self.__input_offsets = {} - input_types = [] - input_converters = [] - readfuns = [] - colnum = 0 - for col in exa._input_columns: - converter = col['type'] - if col['pbtype'] == z.PB_DOUBLE: - input_types.append(('double', colnum, - lambda x: self.__input.next.table.data_double[x])) - elif col['pbtype'] == z.PB_INT32: - input_types.append(('int32', colnum, - lambda x: self.__input.next.table.data_int32[x])) - elif col['pbtype'] == z.PB_INT64: - input_types.append(('int64', colnum, - lambda x: self.__input.next.table.data_int64[x])) - elif col['pbtype'] == z.PB_BOOLEAN: - input_types.append(('bool', colnum, - lambda x: self.__input.next.table.data_bool[x])) - elif col['pbtype'] in (z.PB_NUMERIC, z.PB_TIMESTAMP, z.PB_DATE, z.PB_STRING): - if col['pbtype'] == z.PB_DATE: converter = convert_date - elif col['pbtype'] == z.PB_TIMESTAMP: converter = convert_timestamp - input_types.append(('string', colnum, lambda x: self.__input.next.table.data_string[x])) - else: raise RuntimeError("Unknown type") - self.__colnums[col['name']] = colnum - self.__colnums[unicode(colnum)] = colnum - colnum += 1 - input_converters.append(converter) - - - def read_row(): - self.__row = [] - offset = self.__input_offsets.get('external_row_number',0) - self.__input_offsets['external_row_number'] = offset + 1 - self.__external_row_number = self.__input.next.table.row_number[offset] - for ct, cn, cr in input_types: - offset = self.__input_offsets.get('null', 0) - self.__input_offsets['null'] = offset + 1 - if offset >= len(self.__input.next.table.data_nulls): - table = self.__input.next.table - if self.__input.next.table.data_nulls[offset]: - self.__row.append(None) - else: - offset = self.__input_offsets.get(ct, 0) - self.__input_offsets[ct] = offset + 1 - self.__row.append(input_converters[cn](cr(offset))) - - self.__read_row = read_row - - - - # define the write_row function, which writes a full row to - # the output buffer - writefuns = [] - def get_data(x, c, n, t): - if type(x) not in t: - raise RuntimeError("Column is of type %s but data given have type %s" % (n, str(type(x)))) - return c(x) - for col in exa._output_columns: - if col['pbtype'] == z.PB_DOUBLE: - writefuns.append(lambda x: self.__output.emit.table.data_double.append(get_data(x, float, 'float', (float, int, int, decimal.Decimal))) or 12) - elif col['pbtype'] == z.PB_INT32: - writefuns.append(lambda x: self.__output.emit.table.data_int32.append(get_data(x, int, 'int', (float, int, int, decimal.Decimal))) or 4) - elif col['pbtype'] == z.PB_INT64: - writefuns.append(lambda x: self.__output.emit.table.data_int64.append(get_data(x, int, 'int', (float, int, int, decimal.Decimal))) or 8) - elif col['pbtype'] in (z.PB_NUMERIC, z.PB_STRING): - def wf(x): - d = unicode(x) - self.__output.emit.table.data_string.append(d) - return len(d) - writefuns.append(wf) - elif col['pbtype'] == z.PB_TIMESTAMP: - def wf(x): - d = x.isoformat(' ') - self.__output.emit.table.data_string.append(d) - return len(d) - writefuns.append(wf) - elif col['pbtype'] == z.PB_DATE: - def wf(x): - d = x.isoformat() - self.__output.emit.table.data_string.append(d) - return len(d) - writefuns.append(wf) - elif col['pbtype'] == z.PB_BOOLEAN: - writefuns.append(lambda x: self.__output.emit.table.data_bool.append(get_data(x, bool, 'bool', (bool,))) or 1) - - def write_row(row): - self.__output.emit.table.row_number.append(self.__external_row_number) - self.__written_bytes += 8 # (row_number is INT64) - for d, f in zip(row, writefuns): - if d != None: - self.__written_bytes += f(d) - self.__output.emit.table.data_nulls.append(False) - else: self.__output.emit.table.data_nulls.append(True) - self.__written_bytes += 1 - self.__output.emit.table.rows += 1 - - self.__written_bytes = 0 - self.__write_row = write_row - self.__output_columns_count = len(exa._output_columns) - self.__external_row_number = -1 - - # read first block - #self.next() - - def __getitem__(self, key): - # allows to access columns with this syntax: it["colname"] - key = unicode(key) - if key not in self.__colnums: - raise RuntimeError(u"Column with name '%s' does not exist" % key) - if self.__finished: raise RuntimeError("Iteration finished") - return self.__row[self.__colnums[key]] - - def __getattr__(self, key): - # allows to access columns with this syntax: it.colname - key = unicode(key) - if key not in self.__colnums: - raise RuntimeError(u"Column with name '%s' does not exist" % key) - if self.__finished: raise RuntimeError("Iteration finished") - return self.__row[self.__colnums[key]] - - def emit(self, *output): - # emits one row - # if size of all emmited rows in the buffer is larger then - # MAX_DATASIZE, then flush the buffer. - comm = self.__comm; z = self.__z - if len(output) == self.__output_columns_count: - self.__write_row(output) - limit = MAX_DATASIZE - elif len(output) == 0: limit = 0 - else: raise RuntimeError("Emited wrong number of columns") - if self.__written_bytes > limit: - print("EMIT", comm.connection_id, "bytes sent:", self.__written_bytes) - self.__output.emit.table.rows_in_group = 0 - comm(z.MT_EMIT, z.MT_EMIT, req = self.__output) - self.__written_bytes = 0 - return None - - def next(self, reset = False, first = False): - # reads next row - # if no rows available, fetch next block - # if still no rows available, return False to indicate, that - # no more rows available - # if reset is True, fetch first block of current group and - # beginn with first row - # returns True to indicate, that next row was fetched. - comm = self.__comm; z = self.__z - if first or reset: self.__finished = False - elif self.__finished: return False - if self.__row_num == 0 or self.__row_num >= self.__input.next.table.rows: - if reset: comm(z.MT_RESET, (z.MT_RESET, z.MT_DONE), rep = self.__input) - else: comm(z.MT_NEXT, (z.MT_NEXT, z.MT_DONE), rep = self.__input) - self.__row_num = 0 - for ot in self.__input_offsets.keys(): - self.__input_offsets[ot] = 0 - if self.__input.type == z.MT_DONE: - print("ITER", comm.connection_id, "read next row", "(finished)") - self.__finished = True - return False - else: -# print("ITER", comm.connection_id, "read next row", self.__input.next.table.rows) - print("ITER", comm.connection_id, "read next row", self.__input.next.table) - self.__read_row() - self.__row_num += 1 - return True - - def reset(self): - return self.next(reset = True) - - def size(self): - # returns number of rows in current group - if self.__finished: return 0 - return self.__input.next.table.rows_in_group - -def disallowed_function(*args, **kw): - raise RuntimeError("next(), reset() and emit() functions are not allowed in scalar context") - -#__all__ = ['comm', 'exaiter', 'disallowed_function'] - - -###################################################################################### - - -if len(sys.argv) != 2: - print("Usage:", sys.argv[0], "") - sys.exit(1) - -#import library -print("Ready...") -print(sys.argv) - -# initialize the 'exa' variable, which holds the metadata -exa = Exa(Comm(sys.argv[1])) - - -## def run(ctx): -## ctx.emit(1.4) -## -## def default_output_columns(): -## return 'x double' - -try: - # execute the script code - try: exec(compile(exa.meta.script_code, exa.meta.script_name, 'exec'))#,obj.__dict__) - except Exception as err: - raise ImportError(u'compiling the script %s failed: %s' % (exa.meta.script_name, str(err))) - - #exec(exa.meta.script_code) - - - run = globals()['run'] - if 'cleanup' in globals(): - cleanup = globals()['cleanup'] - - - single_call_names = {exa._z.SC_FN_DEFAULT_OUTPUT_COLUMNS:'default_output_columns'} - single_call_fns = {} - for k,v in single_call_names.items(): - if v in globals(): - single_call_fns[k] = globals()[v] - else: - single_call_fns[k] = None - - it = Exaiter(exa) - # store original values of it.next and it.emit to disallow these functions in the iterator - it_next = it.next; it_emit = it.emit - if exa._input_type == 'SCALAR': - # on scalar UDFs user should not call next and reset functions but simply return, because the run function is - # called per row - it.next = it.reset = disallowed_function - if exa._output_type == 'RETURN': - # on return UDFs user should not call emit function but simply return the value, which should be emited. - it.emit = disallowed_function - - print("Single Call Mode: ", exa._single_call_mode) - - if exa._single_call_mode: - while True: - # indicate that we are ready for requests by sending MT_RUN - exa._comm(exa._z.MT_RUN, (exa._z.MT_RUN, exa._z.MT_CLEANUP,exa._z.MT_CALL,exa._z.MT_FINISHED)) - if exa._comm.rep.type == exa._z.MT_CLEANUP: break - if exa._comm.rep.type == exa._z.MT_FINISHED: break - if exa._comm.rep.type == exa._z.MT_CALL: - sc_rep = exa._comm.rep.call - fn = sc_rep.fn - assert fn != exa._z.SC_FN_NIL - args = sc_rep.args - call_f = single_call_fns[fn] - # - if not call_f: - # no such function defined here - exa._comm.req.type = exa._z.MT_UNDEFINED_CALL - if fn in single_call_names: - exa._comm.req.undefined_call.remote_fn = single_call_names[fn] - else: - exa._comm.req.undefined_call.remote_fn = '' - exa._comm(exa._z.MT_UNDEFINED_CALL, (exa._z.MT_UNDEFINED_CALL)) - else: - res = None - if len(args)>0: - res = call_f(args) - else: - res = call_f() - exa._comm.req.type = exa._z.MT_RETURN - exa._comm.req.call_result.result = res - exa._comm(exa._z.MT_RETURN,(exa._z.MT_RETURN)) - # finish current group - exa._comm(exa._z.MT_DONE, (exa._z.MT_DONE, exa._z.MT_CLEANUP)) - if exa._comm.rep.type == exa._z.MT_CLEANUP: break - else: - # the main loop, one iteration per group, which is randomaly choosen by EXASolution or if GROUP BY is used in the - # query and the function is a SET function, corresponds to the GROUP BY groups. - while True: - # initiate the itereration, returns MT_RUN or, if no more groups available, returns MT_CLEANUP, in which we - # break the main loop - exa._comm(exa._z.MT_RUN, (exa._z.MT_RUN, exa._z.MT_CLEANUP)) - if exa._comm.rep.type == exa._z.MT_CLEANUP: - #tprint("MT_CLEANUP1 .. break") - break - - # Read first block of rows - if not it_next(first = True): - raise RuntimeError("First block fetch failed") - - if exa._input_type == 'SET': - # the UDF is a 'SET' function, so let the iteration be done by the user in the run function - if exa._output_type == 'RETURN': - # the UDF is a 'RETURN' function, so the run function returns the value, which should be emited - #tprint("WORK", exa._comm.connection_id, "in mode SET->RETURN") - it_emit(run(it)) - elif exa._output_type == 'EMIT': - # the UDF is a 'EMIT' function, so the run function emits the data by self - #tprint("WORK", exa._comm.connection_id, "in mode SET->EMIT") - run(it) - else: raise RuntimeError("Internal error - unknown output mode") - - elif exa._input_type == 'SCALAR': - # the UDF is a 'SCALAR' function, so the run function is called on per row basis. - if exa._output_type == 'RETURN': - #tprint("WORK", exa._comm.connection_id, "in mode SCALAR->RETURN") - while True: - it_emit(run(it)) - if not it_next(): - #tprint("SCALAR-RETURN: no next() .. break") - break - elif exa._output_type == 'EMIT': - #tprint("WORK", exa._comm.connection_id, "in mode SCALAR->EMIT") - while True: - run(it) - if not it_next(): - #tprint("SCALAR-EMIT: no next() .. break") - break - else: raise RuntimeError("Internal error - unknown output mode") - - else: raise RuntimeError("Internal error - unknown input mode") - it_emit() # flush the internal output buffer - - # finish current group - exa._comm(exa._z.MT_DONE, (exa._z.MT_DONE, exa._z.MT_CLEANUP)) - if exa._comm.rep.type == exa._z.MT_CLEANUP: - #tprint("MT_CLEANUP2 .. break") - break - - # all groups are processed, call 'cleanup' function if defined - if 'cleanup' in globals(): - cleanup() - try: exa._comm(exa._z.MT_FINISHED, exa._z.MT_FINISHED) - except Exception: pass # last sync, error here are not important - -except Exception as err: - # print backtrace on error - errtypel, errobj, backtrace = sys.exc_info() - exa._comm.req.close.exception_message = "%s: %s" % (err.__class__.__name__, str(err).strip()) - print("ERROR", exa._comm.connection_id, ":", exa._comm.req.close.exception_message) - print("".join(traceback.format_exception(errtypel, errobj, backtrace))) - exa._comm(exa._z.MT_CLOSE) - sys.exit(1) - -tprint("EXITING") -#os._exit(0) - diff --git a/python_client/python2/client b/python_client/python2/client deleted file mode 100755 index 1cc48601e..000000000 --- a/python_client/python2/client +++ /dev/null @@ -1,7 +0,0 @@ -#!/usr/bin/python2 - -import sys, os - -sys.path.insert(0,os.path.realpath(os.path.join(os.path.dirname(os.path.abspath(__file__)), '..'))) - -import common_client diff --git a/python_client/python3/client b/python_client/python3/client deleted file mode 100755 index a9d01b215..000000000 --- a/python_client/python3/client +++ /dev/null @@ -1,7 +0,0 @@ -#!/usr/bin/python3 - -import sys, os - -sys.path.insert(0,os.path.realpath(os.path.join(os.path.dirname(os.path.abspath(__file__)), '..'))) - -import common_client