Skip to content

Commit

Permalink
support iceberg source
Browse files Browse the repository at this point in the history
save

fix ci
  • Loading branch information
xxhZs committed Feb 10, 2025
1 parent 232f7db commit 776bb4a
Show file tree
Hide file tree
Showing 7 changed files with 62 additions and 8 deletions.
1 change: 1 addition & 0 deletions e2e_test/iceberg/test_case/iceberg_engine.slt
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ v_bool boolean,
v_date date,
v_timestamp timestamptz,
v_ts_ntz timestamp,
v_timestamp_ns timestamp_ns,
v_decimal decimal,
v_map map(int, int),
v_array int[],
Expand Down
4 changes: 2 additions & 2 deletions risedev.yml
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ profile:
# ENABLE_PRETTY_LOG: "true"
steps:
# If you want to use the local s3 storage, enable the following line
# - use: minio
- use: minio

# If you want to use aws-s3, configure AK and SK in env var and enable the following lines:
# - use: aws-s3
Expand All @@ -48,7 +48,7 @@ profile:
- use: frontend

# If you want to enable compactor, uncomment the following line, and enable either minio or aws-s3 as well.
# - use: compactor
- use: compactor

# If you want to create source from Kafka, uncomment the following lines
# - use: kafka
Expand Down
4 changes: 2 additions & 2 deletions src/common/src/array/arrow/arrow_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -537,7 +537,7 @@ pub trait FromArrow {
Timestamp(Second, Some(_)) => DataType::Timestamptz,
Timestamp(Millisecond, None) => DataType::Timestamp,
Timestamp(Millisecond, Some(_)) => DataType::Timestamptz,
Timestamp(Nanosecond, None) => DataType::Timestamp,
Timestamp(Nanosecond, None) => DataType::TimestampNanosecond,
Timestamp(Nanosecond, Some(_)) => DataType::Timestamptz,
Interval(MonthDayNano) => DataType::Interval,
Utf8 => DataType::Varchar,
Expand Down Expand Up @@ -815,7 +815,7 @@ pub trait FromArrow {
&self,
array: &arrow_array::TimestampNanosecondArray,
) -> Result<ArrayImpl, ArrayError> {
Ok(ArrayImpl::Timestamp(array.into()))
Ok(ArrayImpl::TimestampNanosecond(array.into()))
}

fn from_timestampns_some_array(
Expand Down
21 changes: 19 additions & 2 deletions src/connector/src/source/iceberg/parquet_file_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,9 @@ use opendal::Operator;
use parquet::arrow::async_reader::AsyncFileReader;
use parquet::arrow::{parquet_to_arrow_schema, ParquetRecordBatchStreamBuilder, ProjectionMask};
use parquet::file::metadata::{FileMetaData, ParquetMetaData, ParquetMetaDataReader};
use risingwave_common::array::arrow::arrow_schema_udf::{DataType as ArrowDateType, IntervalUnit};
use risingwave_common::array::arrow::arrow_schema_udf::{
DataType as ArrowDateType, IntervalUnit, TimeUnit,
};
use risingwave_common::array::arrow::IcebergArrowConvert;
use risingwave_common::array::StreamChunk;
use risingwave_common::catalog::ColumnId;
Expand Down Expand Up @@ -396,7 +398,22 @@ fn is_parquet_schema_match_source_schema(
RwDataType::Float32
)
| (ArrowDateType::Float64, RwDataType::Float64)
| (ArrowDateType::Timestamp(_, None), RwDataType::Timestamp)
| (
ArrowDateType::Timestamp(TimeUnit::Second, None),
RwDataType::Timestamp
)
| (
ArrowDateType::Timestamp(TimeUnit::Millisecond, None),
RwDataType::Timestamp
)
| (
ArrowDateType::Timestamp(TimeUnit::Microsecond, None),
RwDataType::Timestamp
)
| (
ArrowDateType::Timestamp(TimeUnit::Nanosecond, None),
RwDataType::TimestampNanosecond
)
| (
ArrowDateType::Timestamp(_, Some(_)),
RwDataType::Timestamptz
Expand Down
27 changes: 26 additions & 1 deletion src/expr/impl/src/scalar/date_trunc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,15 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use risingwave_common::types::{Interval, Timestamp, Timestamptz};
use risingwave_common::types::{Interval, Timestamp, TimestampNanosecond, Timestamptz};
use risingwave_expr::{function, ExprError, Result};

use super::timestamptz::timestamp_at_time_zone;

// TODO(xiangjinwu): parse into an enum
const MICROSECONDS: &str = "microseconds";
const MILLISECONDS: &str = "milliseconds";
const NANOSECONDS: &str = "nanoseconds";
const SECOND: &str = "second";
const MINUTE: &str = "minute";
const HOUR: &str = "hour";
Expand All @@ -32,6 +33,30 @@ const DECADE: &str = "decade";
const CENTURY: &str = "century";
const MILLENNIUM: &str = "millennium";

#[function("date_trunc(varchar, timestamp_ns) -> timestamp_ns")]
pub fn date_trunc_timestamp_ns(
field: &str,
ts: TimestampNanosecond,
) -> Result<TimestampNanosecond> {
Ok(match field.to_ascii_lowercase().as_str() {
NANOSECONDS => ts,
MICROSECONDS => ts.truncate_micros(),
MILLISECONDS => ts.truncate_millis(),
SECOND => ts.truncate_second(),
MINUTE => ts.truncate_minute(),
HOUR => ts.truncate_hour(),
DAY => ts.truncate_day(),
WEEK => ts.truncate_week(),
MONTH => ts.truncate_month(),
QUARTER => ts.truncate_quarter(),
YEAR => ts.truncate_year(),
DECADE => ts.truncate_decade(),
CENTURY => ts.truncate_century(),
MILLENNIUM => ts.truncate_millennium(),
_ => return Err(invalid_field_error(field)),
})
}

#[function("date_trunc(varchar, timestamp) -> timestamp")]
pub fn date_trunc_timestamp(field: &str, ts: Timestamp) -> Result<Timestamp> {
Ok(match field.to_ascii_lowercase().as_str() {
Expand Down
11 changes: 10 additions & 1 deletion src/expr/impl/src/scalar/timestamptz.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@ use std::fmt::Write;
use chrono::LocalResult;
use num_traits::CheckedNeg;
use risingwave_common::types::{
write_date_time_tz, CheckedAdd, Interval, IntoOrdered, Timestamp, Timestamptz, F64,
write_date_time_tz, CheckedAdd, Interval, IntoOrdered, Timestamp, TimestampNanosecond,
Timestamptz, F64,
};
use risingwave_expr::{function, ExprError, Result};
use thiserror_ext::AsReport;
Expand Down Expand Up @@ -49,6 +50,14 @@ pub fn timestamptz_at_time_zone(input: Timestamptz, time_zone: &str) -> Result<T
Ok(Timestamp(naive))
}

#[function("at_time_zone(timestamp_ns, varchar) -> timestamptz")]
pub fn timestamp_ns_at_time_zone(
input: TimestampNanosecond,
time_zone: &str,
) -> Result<Timestamptz> {
timestamp_at_time_zone(input.into(), time_zone)
}

#[function("at_time_zone(timestamp, varchar) -> timestamptz")]
pub fn timestamp_at_time_zone(input: Timestamp, time_zone: &str) -> Result<Timestamptz> {
let time_zone = Timestamptz::lookup_time_zone(time_zone).map_err(time_zone_err)?;
Expand Down
2 changes: 2 additions & 0 deletions src/frontend/src/expr/session_timezone.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,8 @@ impl SessionTimezone {
// => `(input_timestamptz AT TIME ZONE zone_string)::time`
// `input_timestamptz::timestamp`
// => `input_timestamptz AT TIME ZONE zone_string`
// `input_timestamptz::timestamp_ns`
// => `input_timestamptz AT TIME ZONE zone_string`
ExprType::Cast => {
assert_eq!(inputs.len(), 1);
let mut input = inputs[0].clone();
Expand Down

0 comments on commit 776bb4a

Please sign in to comment.