Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

sink emitting more rows than expected, followup of #10853 #13025

Closed
docteurklein opened this issue Oct 24, 2023 · 7 comments
Closed

sink emitting more rows than expected, followup of #10853 #13025

docteurklein opened this issue Oct 24, 2023 · 7 comments
Assignees
Labels
type/bug Something isn't working user-feedback
Milestone

Comments

@docteurklein
Copy link

docteurklein commented Oct 24, 2023

Describe the bug

followup of #10853

looks like I'm still getting extraneous sink outputs when changing only 1 element of a json structure containing siblings.

version: PostgreSQL 9.5-RisingWave-1.3.0-alpha (bb2319e)

on mysql source

update pim_catalog_product set raw_values = 
  JSON_MERGE_PATCH(raw_values, '{"desc3": {"<all_channels>": {"<all_locales>": 4}}}')
where id = 1;

expected output:

{                                                                                                                      
  "topic": "product_value_edited",                                                                                     
  "key": "{\"attribute\":\"desc3\",\"channel\":null,\"locale\":\"\u003call_locales\",\"product_id\":1}",                                                                                                                                      
  "value": "{\"attribute\":\"desc3\",\"channel\":null,\"locale\":\"\u003call_locales\",\"product_id\":1,\"value\":\"4\"}",                                                                                                                                                                                                   
  "timestamp": 1698077863141,                                                                                          
  "partition": 0,                                                                                                                                                                                                                                                                                                            
  "offset": 45                                                                                                         
}   

actual output

{                                                                                                                      
  "topic": "product_value_edited",                                                                                     
  "key": "{\"attribute\":\"desc2\",\"channel\":null,\"locale\":\"\u003call_locales\",\"product_id\":1}",               
  "timestamp": 1698077852140,                                                                                          
  "partition": 0,                                                                                                      
  "offset": 41                                                                                                         
}                                                                                                                      
{                                                                                                                      
  "topic": "product_value_edited",                                                                                     
  "key": "{\"attribute\":\"desc2\",\"channel\":null,\"locale\":\"\u003call_locales\",\"product_id\":1}",               
  "value": "{\"attribute\":\"desc2\",\"channel\":null,\"locale\":\"\u003call_locales\",\"product_id\":1,\"value\":\"4\"}",                                                                                                                                                                                                   
  "timestamp": 1698077852140,                                                                                          
  "partition": 0,                                                                                                      
  "offset": 42                                                                                                         
}                                                                                                                      
{                                                                                                                                                                                                                                             
  "topic": "product_value_edited",                                                                                     
  "key": "{\"attribute\":\"name\",\"channel\":null,\"locale\":\"\u003call_locales\",\"product_id\":1}",                
  "timestamp": 1698077863141,                                                                                                                                                                                                                                                                                                
  "partition": 0,                                                                                                      
  "offset": 43                                                                                                         
}                                                                                                                      
{                                                                                                                      
  "topic": "product_value_edited",                                                                                                                                                                                                            
  "key": "{\"attribute\":\"name\",\"channel\":null,\"locale\":null,\"product_id\":1}",                                 
  "timestamp": 1698077863141,                                                                                          
  "partition": 0,                                                                                                                                                                                                                                                                                                            
  "offset": 44                                                                                                         
}                                                                                                                      
{                                                                                                                      
  "topic": "product_value_edited",                                                                                     
  "key": "{\"attribute\":\"desc3\",\"channel\":null,\"locale\":\"\u003call_locales\",\"product_id\":1}",                                                                                                                                      
  "value": "{\"attribute\":\"desc3\",\"channel\":null,\"locale\":\"\u003call_locales\",\"product_id\":1,\"value\":\"4\"}",                                                                                                                                                                                                   
  "timestamp": 1698077863141,                                                                                          
  "partition": 0,                                                                                                                                                                                                                                                                                                            
  "offset": 45                                                                                                         
}                                                                                                                      
{                                                                                                                                                             
  "topic": "product_value_edited",                                                                                                                            
  "key": "{\"attribute\":\"name\",\"channel\":null,\"locale\":\"\u003call_locales\",\"product_id\":1}",                                                       
  "value": "{\"attribute\":\"name\",\"channel\":null,\"locale\":\"\u003call_locales\",\"product_id\":1,\"value\":\"{}\"}",                                                                                                                                                                                                   
  "timestamp": 1698077863141,                                                                                                                                 
  "partition": 0,                                                                                                                                             
  "offset": 46                                                                                                                                                
}                                                                                                                                                             
{                                                                                                                                                             
  "topic": "product_value_edited",                                                                                                                            
  "key": "{\"attribute\":\"name\",\"channel\":null,\"locale\":null,\"product_id\":1}",                                                                        
  "value": "{\"attribute\":\"name\",\"channel\":null,\"locale\":null,\"product_id\":1,\"value\":\"0.4290362600507552\"}",                                                                                                                                                                                                    
  "timestamp": 1698077863141,                                                                                                                                 
  "partition": 0,                                                                                                                                             
  "offset": 47    
}
@docteurklein docteurklein added the type/bug Something isn't working label Oct 24, 2023
@github-actions github-actions bot added this to the release-1.4 milestone Oct 24, 2023
@st1page
Copy link
Contributor

st1page commented Oct 24, 2023

I guess the key is expected to be pim_catalog_product.id instead of the full raw_values.

Would you please provide the complete CREATE SINK statement and the explain result of it?

explain CREATE SINK sink_name  ....

Also the table's definition in the downstream PG.

@docteurklein
Copy link
Author

docteurklein commented Oct 24, 2023

for sure, sorry for not including it (I've got the source code here just in case: https://github.com/docteurklein/risingwave-demo/blob/main/rw/product.sql)

sink DDL

create sink product_value_edited from pim1.product_value
with (
    connector='kafka',
    type='upsert',
    primary_key='product_id, attribute, channel, locale',
    -- force_append_only='true',
    properties.bootstrap.server='redpanda:9092',
    topic='product_value_edited'
);

explain create sink

 StreamSink { type: upsert, columns: [product_id, attribute, channel, locale, value, product_value.product.uuid(hidden), product_value.projected_row_id(hidden), product_value.projected_row_id#1(hidden), product_value.projected_row_id#2(hidden)], pk: [product_value.product.uuid, product_value.projected_row_id, product_value.projected_row_id#1, product_value.projected_row_id#2] }
 └─StreamTableScan { table: product_value, columns: [product_id, attribute, channel, locale, value, product.uuid, projected_row_id, projected_row_id#1, projected_row_id#2] }

mysql source DDL

create table pim_catalog_product (
  uuid binary(16),
  family_id int,
  product_model_id int,
  family_variant_id int,
  id int,
  is_enabled bool,
  identifier text,
  raw_values json,
  created timestamp,
  updated timestamp,
  primary key (uuid)
);

insert into pim_catalog_product (uuid, id, family_id, raw_values) values
(UUID_TO_BIN(uuid()), 1, 1, '{}');

I'll gladly provide anything that could help, Thanks for the quick answer!

@st1page st1page self-assigned this Oct 24, 2023
@st1page
Copy link
Contributor

st1page commented Oct 31, 2023

I guess you should change the primary key to primary_key='product_id' ? the actual output exactly output records with different pks. 🤔 Can you provide the result of the SELECT * FROM pim1.product_value?

@stdrc
Copy link
Member

stdrc commented Oct 31, 2023

Looks like that the update command will merge "desc3": ... into raw_values which may be {"name": ..., "desc2": ...}, hence, this UPDATE appear to RisingWave as a whole row update.

Then, during jsonb_each, RW expands the row update to multiple delete and insert, in this process, RW cannot easily merge unchanged "name" and "desc2" rows due to our internal mechanism. This finally leads to multiple update records on sink.

We may support update compaction by sink pk when sinking in the future (still investigating), but currently it seems to be expected behavior (as long as the final output is correct). If the final result is actually wrong, please notify us, may need further debugging.

@docteurklein
Copy link
Author

Sorry, no computer access right now, but yes that's exactly what @stdrc described. The output always contains correct values (although some are extraneous since some rows of the mat view didn't change (like name and desc2 in the example).

Ok so I'll wait for update compaction then. Thanks for the clarifications!

@st1page
Copy link
Contributor

st1page commented Apr 8, 2024

I think it has been fixed after #15345 and ok in release v1.8

@st1page st1page modified the milestones: release-1.8, release-1.9 Apr 8, 2024
@stdrc stdrc changed the title followup of #10853 sink emitting more rows than expected, followup of #10853 Apr 9, 2024
@stdrc
Copy link
Member

stdrc commented Apr 9, 2024

I think it has been fixed after #15345 and ok in release v1.8

Closing this issue since the fix has been merged.

@stdrc stdrc closed this as completed Apr 9, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
type/bug Something isn't working user-feedback
Projects
None yet
Development

No branches or pull requests

4 participants