Skip to content

Commit

Permalink
Write to Parquet by using JSON writer (supports complex data structures)
Browse files Browse the repository at this point in the history
  • Loading branch information
exAspArk committed Nov 12, 2024
1 parent 5de914a commit b2e6afe
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 20 deletions.
8 changes: 5 additions & 3 deletions src/pg_schema_column.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ type IcebergSchemaField struct {
Required bool
}

func (pgSchemaColumn PgSchemaColumn) ToStringParquetSchemaField() string {
func (pgSchemaColumn PgSchemaColumn) ToParquetSchemaFieldMap() map[string]interface{} {
field := pgSchemaColumn.toParquetSchemaField()

keyVals := []string{
Expand All @@ -72,10 +72,12 @@ func (pgSchemaColumn PgSchemaColumn) ToStringParquetSchemaField() string {
keyVals = append(keyVals, "precision="+field.Precision)
}

return strings.Join(keyVals, ", ")
return map[string]interface{}{
"Tag": strings.Join(keyVals, ", "),
}
}

func (pgSchemaColumn PgSchemaColumn) ToMapIcebergSchemaField() map[string]interface{} {
func (pgSchemaColumn PgSchemaColumn) ToIcebergSchemaFieldMap() map[string]interface{} {
field := pgSchemaColumn.toIcebergSchemaField()

return map[string]interface{}{
Expand Down
37 changes: 20 additions & 17 deletions src/storage_base.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,19 @@ type StorageBase struct {
func (storage *StorageBase) WriteParquetFile(fileWriter source.ParquetFile, pgSchemaColumns []PgSchemaColumn, loadRows func() [][]string) (recordCount int64, err error) {
defer fileWriter.Close()

stringSchemaFields := make([]string, len(pgSchemaColumns))
for i, pgSchemaColumn := range pgSchemaColumns {
stringSchemaFields[i] = pgSchemaColumn.ToStringParquetSchemaField()
LogDebug(storage.config, "Parquet schema field:", stringSchemaFields[i])
schemaMap := map[string]interface{}{
"Tag": "name=root",
"Fields": []map[string]interface{}{},
}
for _, pgSchemaColumn := range pgSchemaColumns {
fieldMap := pgSchemaColumn.ToParquetSchemaFieldMap()
schemaMap["Fields"] = append(schemaMap["Fields"].([]map[string]interface{}), fieldMap)
}
schemaJson, err := json.Marshal(schemaMap)
PanicIfError(err)

parquetWriter, err := writer.NewCSVWriter(stringSchemaFields, fileWriter, PARQUET_PARALLEL_NUMBER)
LogDebug(storage.config, "Parquet schema:", string(schemaJson))
parquetWriter, err := writer.NewJSONWriter(string(schemaJson), fileWriter, PARQUET_PARALLEL_NUMBER)
if err != nil {
return 0, fmt.Errorf("Failed to create Parquet writer: %v", err)
}
Expand All @@ -41,18 +47,15 @@ func (storage *StorageBase) WriteParquetFile(fileWriter source.ParquetFile, pgSc
rows := loadRows()
for len(rows) > 0 {
for _, row := range rows {
LogDebug(storage.config, "Parquet row:", row)
rowWithPointers := make([]*string, len(row))
for i := 0; i < len(row); i++ {
rowWithPointers[i] = pgSchemaColumns[i].FormatParquetValue(row[i])
// LogDebug(storage.config, "Pg schema column:", pgSchemaColumns[i].ToStringParquetSchemaField())
// if rowWithPointers[i] == nil {
// LogDebug(storage.config, "Parquet row value: nil")
// } else {
// LogDebug(storage.config, "Parquet row value:", *rowWithPointers[i])
// }
rowMap := make(map[string]interface{})
for i, rowValue := range row {
rowMap[pgSchemaColumns[i].ColumnName] = pgSchemaColumns[i].FormatParquetValue(rowValue)
}
if err = parquetWriter.WriteString(rowWithPointers); err != nil {
rowJson, err := json.Marshal(rowMap)
PanicIfError(err)

LogDebug(storage.config, "Parquet row:", string(rowJson))
if err = parquetWriter.Write(string(rowJson)); err != nil {
return 0, fmt.Errorf("Write error: %v", err)
}
recordCount++
Expand Down Expand Up @@ -301,7 +304,7 @@ func (storage *StorageBase) WriteMetadataFile(fileSystemPrefix string, filePath

icebergSchemaFields := make([]interface{}, len(pgSchemaColumns))
for i, pgSchemaColumn := range pgSchemaColumns {
icebergSchemaFields[i] = pgSchemaColumn.ToMapIcebergSchemaField()
icebergSchemaFields[i] = pgSchemaColumn.ToIcebergSchemaFieldMap()
}

metadata := map[string]interface{}{
Expand Down

0 comments on commit b2e6afe

Please sign in to comment.