Skip to content

Commit

Permalink
working on getobjects
Browse files Browse the repository at this point in the history
  • Loading branch information
David Coe committed Sep 21, 2023
1 parent b479cd0 commit 0116e21
Show file tree
Hide file tree
Showing 7 changed files with 312 additions and 88 deletions.
163 changes: 155 additions & 8 deletions csharp/src/Drivers/Apache/Hive2/HiveServer2Connection.cs
Original file line number Diff line number Diff line change
@@ -1,29 +1,44 @@
using System;
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

using System;
using System.Collections.Generic;
using System.Collections.ObjectModel;
using System.Net.Http;
using System.Net.Http.Headers;
using System.Threading;
using Apache.Arrow.Adbc;
using System.Threading.Tasks;
using Apache.Arrow.Ipc;
using Apache.Hive.Service.Rpc.Thrift;
using Thrift;
using Thrift.Protocol;
using Thrift.Transport;

namespace Apache.Arrow.Adbc.Drivers.Apache.Hive2
{
public abstract class HiveServer2Connection : AdbcConnection
{
const string userAgent = "PowerBiExperimental/0.0";
const string userAgent = "AdbcExperimental/0.0";

protected TOperationHandle operationHandle;
protected IReadOnlyDictionary<string, string> properties;
internal TTransport transport;
internal TCLIService.Client client;
internal TSessionHandle sessionHandle;

internal HiveServer2Connection() : this(null)
{

}

internal HiveServer2Connection(IReadOnlyDictionary<string, string> properties)
Expand All @@ -44,6 +59,71 @@ public void Open()
protected abstract TProtocol CreateProtocol();
protected abstract TOpenSessionReq CreateSessionRequest();

public override IArrowArrayStream GetObjects(GetObjectsDepth depth, string catalogPattern, string dbSchemaPattern, string tableNamePattern, List<string> tableTypes, string columnNamePattern)
{
if (depth == GetObjectsDepth.All || depth >= GetObjectsDepth.Catalogs)
{
TGetCatalogsReq getCatalogsReq = new TGetCatalogsReq(this.sessionHandle);
}

if (depth == GetObjectsDepth.All || depth >= GetObjectsDepth.DbSchemas)
{
TGetSchemasReq getSchemasReq = new TGetSchemasReq(this.sessionHandle);
}

if (depth == GetObjectsDepth.All || depth >= GetObjectsDepth.Tables)
{
TGetTablesReq getSchemasReq = new TGetTablesReq(this.sessionHandle);
}

if (depth == GetObjectsDepth.All)
{
TGetColumnsReq columnsReq = new TGetColumnsReq(this.sessionHandle);
columnsReq.CatalogName = catalogPattern;
columnsReq.SchemaName = dbSchemaPattern;
columnsReq.TableName = tableNamePattern;

if (!string.IsNullOrEmpty(columnNamePattern))
columnsReq.ColumnName = columnNamePattern;

var columnsResponse = this.client.GetColumns(columnsReq).Result;
if (columnsResponse.Status.StatusCode == TStatusCode.ERROR_STATUS)
{
throw new Exception(columnsResponse.Status.ErrorMessage);
}

this.operationHandle = columnsResponse.OperationHandle;
}

PollForResponse();

Schema schema = GetSchema();

return new GetObjectsReader(this,schema);
}

public override IArrowArrayStream GetInfo(List<int> codes)
{
throw new NotImplementedException();
}

public override IArrowArrayStream GetTableTypes()
{
throw new NotImplementedException();
}

protected void PollForResponse()
{
TGetOperationStatusResp statusResponse = null;
do
{
if (statusResponse != null) { Thread.Sleep(500); }
TGetOperationStatusReq request = new TGetOperationStatusReq(this.operationHandle);
statusResponse = this.client.GetOperationStatus(request).Result;
} while (statusResponse.OperationState == TOperationState.PENDING_STATE || statusResponse.OperationState == TOperationState.RUNNING_STATE);
}


public override void Dispose()
{
if (this.client != null)
Expand All @@ -57,5 +137,72 @@ public override void Dispose()
this.client = null;
}
}

protected Schema GetSchema()
{
TGetResultSetMetadataReq request = new TGetResultSetMetadataReq(this.operationHandle);
TGetResultSetMetadataResp response = this.client.GetResultSetMetadata(request).Result;
return SchemaParser.GetArrowSchema(response.Schema);
}

sealed class GetObjectsReader : IArrowArrayStream
{
HiveServer2Connection connection;
Schema schema;
List<TSparkArrowBatch> batches;
int index;
IArrowReader reader;

public GetObjectsReader(HiveServer2Connection connection, Schema schema)
{
this.connection = connection;
this.schema = schema;
}

public Schema Schema { get { return schema; } }

public async ValueTask<RecordBatch> ReadNextRecordBatchAsync(CancellationToken cancellationToken = default)
{
while (true)
{
if (this.reader != null)
{
RecordBatch next = await this.reader.ReadNextRecordBatchAsync(cancellationToken);
if (next != null)
{
return next;
}
this.reader = null;
}

if (this.batches != null && this.index < this.batches.Count)
{
this.reader = new ArrowStreamReader(new ChunkStream(this.schema, this.batches[this.index++].Batch));
continue;
}

this.batches = null;
this.index = 0;

if (this.connection == null)
{
return null;
}

TFetchResultsReq request = new TFetchResultsReq(this.connection.operationHandle, TFetchOrientation.FETCH_NEXT, 50000);
TFetchResultsResp response = await this.connection.client.FetchResults(request, cancellationToken);
this.batches = response.Results.ArrowBatches;

if (!response.HasMoreRows)
{
this.connection = null;
}
}
}

public void Dispose()
{
}
}
}
}
78 changes: 19 additions & 59 deletions csharp/src/Drivers/Apache/Hive2/HiveServer2Statement.cs
Original file line number Diff line number Diff line change
@@ -1,8 +1,22 @@
using System;
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

using System;
using System.Threading;
using Apache.Arrow;
using Apache.Arrow.Adbc;
using Apache.Arrow.Types;
using Apache.Hive.Service.Rpc.Thrift;

namespace Apache.Arrow.Adbc.Drivers.Apache.Hive2
Expand Down Expand Up @@ -49,7 +63,7 @@ protected Schema GetSchema()
{
TGetResultSetMetadataReq request = new TGetResultSetMetadataReq(this.operationHandle);
TGetResultSetMetadataResp response = this.connection.client.GetResultSetMetadata(request).Result;
return GetArrowSchema(response.Schema);
return SchemaParser.GetArrowSchema(response.Schema);
}

public override void Dispose()
Expand All @@ -64,60 +78,6 @@ public override void Dispose()
base.Dispose();
}

static Schema GetArrowSchema(TTableSchema thriftSchema)
{
Field[] fields = new Field[thriftSchema.Columns.Count];
for (int i = 0; i < thriftSchema.Columns.Count; i++)
{
TColumnDesc column = thriftSchema.Columns[i];
fields[i] = new Field(column.ColumnName, GetArrowType(column.TypeDesc.Types[0]), nullable: true /* ??? */);
}
return new Schema(fields, null);
}

static IArrowType GetArrowType(TTypeEntry thriftType)
{
if (thriftType.PrimitiveEntry != null)
{
return GetArrowType(thriftType.PrimitiveEntry.Type);
}
throw new InvalidOperationException();
}

static IArrowType GetArrowType(TTypeId thriftType)
{
switch (thriftType)
{
case TTypeId.BIGINT_TYPE: return Int64Type.Default;
case TTypeId.BINARY_TYPE: return BinaryType.Default;
case TTypeId.BOOLEAN_TYPE: return BooleanType.Default;
case TTypeId.CHAR_TYPE: return StringType.Default;
case TTypeId.DATE_TYPE: return Date32Type.Default;
case TTypeId.DOUBLE_TYPE: return DoubleType.Default;
case TTypeId.FLOAT_TYPE: return FloatType.Default;
case TTypeId.INT_TYPE: return Int32Type.Default;
case TTypeId.NULL_TYPE: return NullType.Default;
case TTypeId.SMALLINT_TYPE: return Int16Type.Default;
case TTypeId.STRING_TYPE: return StringType.Default;
case TTypeId.TIMESTAMP_TYPE: return new TimestampType(TimeUnit.Microsecond, (string)null);
case TTypeId.TINYINT_TYPE: return Int8Type.Default;
case TTypeId.VARCHAR_TYPE: return StringType.Default;

// ???
case TTypeId.DECIMAL_TYPE:
case TTypeId.INTERVAL_DAY_TIME_TYPE:
case TTypeId.INTERVAL_YEAR_MONTH_TYPE:
return StringType.Default;

case TTypeId.ARRAY_TYPE:
case TTypeId.MAP_TYPE:
case TTypeId.STRUCT_TYPE:
case TTypeId.UNION_TYPE:
case TTypeId.USER_DEFINED_TYPE:
return StringType.Default;
default:
throw new NotImplementedException();
}
}
}
}
19 changes: 2 additions & 17 deletions csharp/src/Drivers/Apache/Spark/SparkConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ namespace Apache.Arrow.Adbc.Drivers.Apache.Spark
{
public class SparkConnection : HiveServer2Connection
{
const string userAgent = "PowerBiExperimental/0.0";
const string userAgent = "AdbcExperimental/0.0";

internal static readonly Dictionary<string, string> timestampConfig = new Dictionary<string, string>
{
Expand All @@ -40,7 +40,7 @@ public class SparkConnection : HiveServer2Connection

public SparkConnection() : this(null)
{

}

internal SparkConnection(IReadOnlyDictionary<string, string> properties)
Expand Down Expand Up @@ -98,20 +98,5 @@ public override void Dispose()
this.client = null;
}
}

public override IArrowArrayStream GetInfo(List<int> codes)
{
throw new NotImplementedException();
}

public override IArrowArrayStream GetObjects(GetObjectsDepth depth, string catalogPattern, string dbSchemaPattern, string tableNamePattern, List<string> tableTypes, string columnNamePattern)
{
throw new NotImplementedException();
}

public override IArrowArrayStream GetTableTypes()
{
throw new NotImplementedException();
}
}
}
20 changes: 18 additions & 2 deletions csharp/src/Drivers/Apache/Thrift/ChunkStream.cs
Original file line number Diff line number Diff line change
@@ -1,8 +1,24 @@
using System;
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

using System;
using System.IO;
using System.Threading;
using System.Threading.Tasks;
using Apache.Arrow;
using Apache.Arrow.Ipc;

namespace Apache.Arrow.Adbc.Drivers.Apache
Expand Down
19 changes: 18 additions & 1 deletion csharp/src/Drivers/Apache/Thrift/IPeekableTransport.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,21 @@
using System.IO;
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

using System.IO;

namespace Apache.Arrow.Adbc.Drivers.Apache
{
Expand Down
Loading

0 comments on commit 0116e21

Please sign in to comment.