Я использую delta-rs для записи в таблицу Delta в Delta Lake. Вот мой код: [code]import time import numpy as np import pandas as pd import pyarrow as pa from deltalake.writer import write_deltalake
num_rows = 10 timestamp = np.array([time.time() + i * 0.01 for i in range(num_rows)]) current = np.random.rand(num_rows) * 10 voltage = np.random.rand(num_rows) * 100 temperature = np.random.rand(num_rows) * 50 data = { "timestamp": timestamp, "current": current, "voltage": voltage, "temperature": temperature, } df = pd.DataFrame(data) storage_options = { "AWS_DEFAULT_REGION": "us-west-2", "AWS_ACCESS_KEY_ID": "xxx", "AWS_SECRET_ACCESS_KEY": "xxx", "AWS_S3_ALLOW_UNSAFE_RENAME": "true", } schema = pa.schema( [ ("timestamp", pa.float64()), ("current", pa.float64()), ("voltage", pa.float64()), ("temperature", pa.float64()), ] ) write_deltalake( "s3a://my-bucket/delta-tables/motor", df, mode="append", schema=schema, storage_options=storage_options, ) [/code] Приведенный выше код успешно записал данные, включая 4 столбца, в дельта-таблицу. Я могу подтвердить это с помощью Spark SQL: [code]spark-sql> describe table delta.`s3a://my-bucket/delta-tables/motor`; 23/05/22 06:38:51 WARN ObjectStore: Failed to get database delta, returning NoSuchObjectException timestamp double current double voltage double temperature double
# Partitioning Not partitioned Time taken: 0.39 seconds, Fetched 7 row(s)
spark-sql> select * from delta . `s3a://my-bucket/delta-tables/motor` limit 10; 23/05/22 07:01:50 WARN ObjectStore: Failed to get database delta, returning NoSuchObjectException 1.683746477029865E9 7.604250297497938 9.421758439102415 72.1927369069416 1.683746477039865E9 0.09092487512480374 17.989035574705202 35.350210012093214 1.683746477049866E9 7.493128659573002 9.390891728445448 48.541259705334625 1.683746477059866E9 2.717780962917138 0.9268887657049119 59.10566692023579 1.683746477069866E9 2.57300442470119 17.486083607683693 47.23521355609355 1.683746477079866E9 2.09432242350117 14.945888123248054 47.125030870747715 1.683746477089866E9 4.136491853926207 16.52334128991138 27.544656909406505 1.6837464770998669E9 1.1299759566741152 5.539831633892187 52.50892511866684 1.6837464771098669E9 0.9626607062002979 8.400536671329352 72.49131313291358 1.6837464771198668E9 7.6866231204656446 4.033915109232906 48.900631068812075 Time taken: 5.925 seconds, Fetched 10 row(s) [/code] Теперь я пытаюсь записать в таблицу Delta с новым давлением столбца: [code]import time import numpy as np import pandas as pd import pyarrow as pa from deltalake.writer import write_deltalake
num_rows = 10 timestamp = np.array([time.time() + i * 0.01 for i in range(num_rows)]) current = np.random.rand(num_rows) * 10 voltage = np.random.rand(num_rows) * 100 temperature = np.random.rand(num_rows) * 50 pressure = np.random.rand(num_rows) * 1000 data = { "timestamp": timestamp, "current": current, "voltage": voltage, "temperature": temperature, "pressure": pressure, } df = pd.DataFrame(data) storage_options = { "AWS_DEFAULT_REGION": "us-west-2", "AWS_ACCESS_KEY_ID": "xxx", "AWS_SECRET_ACCESS_KEY": "xxx", "AWS_S3_ALLOW_UNSAFE_RENAME": "true", } schema = pa.schema( [ ("timestamp", pa.float64()), ("current", pa.float64()), ("voltage", pa.float64()), ("temperature", pa.float64()), ("pressure", pa.float64()), #