Last active
September 2, 2021 08:09
-
-
Save Nhoutain/99c50bdd9a2b31155a4c62bd6de559c4 to your computer and use it in GitHub Desktop.
Duplicate data on mvt
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
DROP FUNCTION IF EXISTS public.flow_business_key(table_name TEXT); | |
DROP FUNCTION IF EXISTS public.flows(); | |
DROP FUNCTION IF EXISTS public.fix_flows_duplicate_data(fix boolean); | |
DROP FUNCTION IF EXISTS public.fix_flow_duplicate_data(table_access TEXT, fix boolean); | |
DROP FUNCTION IF EXISTS public.fix_flow_bkey_duplicate_data(table_access TEXT, bkey TEXT[], fix boolean); | |
-- ---------------------------------------------------------------------- | |
-- FLOWS | |
-- ---------------------------------------------------------------------- | |
CREATE OR REPLACE FUNCTION public.flow_business_key(table_name TEXT) | |
RETURNS TABLE(bkey text[]) AS | |
$body$ | |
SELECT ('{'||(regexp_matches(indexdef, '\((.*), mt, mvf\)'))[1]||'}')::text[] | |
FROM pg_indexes indexes | |
WHERE indexes.tablename = $1 and indexname like '%_bkey' | |
$body$ | |
LANGUAGE sql; | |
CREATE OR REPLACE FUNCTION public.flows() | |
RETURNS TABLE(table_access text, bkey text[]) AS | |
$body$ | |
SELECT table_schema || '.' || table_name as table_access, flow_business_key(table_name) as bkey | |
FROM information_schema.tables | |
WHERE table_name in (select distinct('flow_' || REPLACE(flow_id, '-', '')) from injection where status = 'DONE') | |
$body$ | |
LANGUAGE sql; | |
-- ---------------------------------------------------------------------- | |
-- FIX DUPLICATE | |
-- ---------------------------------------------------------------------- | |
CREATE OR REPLACE FUNCTION public.fix_flows_duplicate_data(fix boolean DEFAULT false) | |
RETURNS TABLE(table_access text, mid text, old_mvt bigint, new_mvt bigint) AS | |
$func$ | |
DECLARE | |
rec record; | |
counter INTEGER := 0 ; | |
total INTEGER := 0 ; | |
BEGIN | |
total := (SELECT count(*)::INTEGER FROM flows()); | |
RAISE NOTICE 'Fixing duplicate on all tables [%]', total; | |
FOR rec IN SELECT * FROM flows() | |
LOOP | |
counter := counter + 1 ; | |
RAISE NOTICE ' [%/%] ...', counter, total; | |
RETURN QUERY EXECUTE format( | |
'SELECT %s, * from public.fix_flow_bkey_duplicate_data(%s, %s, %s)', | |
quote_literal(rec.table_access), quote_literal(rec.table_access), quote_literal(format('%s', rec.bkey)), quote_literal(fix)); | |
END LOOP; | |
END | |
$func$ LANGUAGE plpgsql; | |
CREATE OR REPLACE FUNCTION public.fix_flow_duplicate_data(table_access TEXT, fix boolean DEFAULT false) | |
RETURNS TABLE(mid text, old_mvt bigint, new_mvt bigint) AS | |
$func$ | |
DECLARE | |
bkey text[]; | |
BEGIN | |
EXECUTE format('select bkey from public.flow_business_key(%s)', quote_literal(REPLACE(table_access, 'flows.', ''))) | |
INTO bkey; | |
RETURN QUERY EXECUTE format( | |
'SELECT mid, old_mvt, new_mvt from public.fix_flow_bkey_duplicate_data(%s, %s, %s)', | |
quote_literal(table_access), quote_literal(format('%s', bkey)), quote_literal(fix)); | |
END | |
$func$ LANGUAGE plpgsql; | |
CREATE OR REPLACE FUNCTION public.fix_flow_bkey_duplicate_data(table_access TEXT, bkey TEXT[], fix boolean DEFAULT false) | |
RETURNS TABLE(mid text, old_mvt bigint, new_mvt bigint) AS | |
$func$ | |
DECLARE | |
temp_table text := format('temp_update_%s', REPLACE(table_access, 'flows.', '')); | |
cursor REFCURSOR; | |
previous_row RECORD; | |
current_row RECORD; | |
previous_mvt bigint; | |
current_mvt bigint; | |
bkey_string text := array_to_string(array_agg(bkey), ', '); | |
counter bigint := 0 ; | |
BEGIN | |
RAISE NOTICE ' Fixing duplicate on table [%] (%)', table_access, bkey_string; | |
RAISE NOTICE ' Create temporary update for table [%]', table_access; | |
EXECUTE format(' CREATE TEMPORARY TABLE %s (mid text, old_mvt bigint, new_mvt bigint) ON COMMIT DROP', temp_table); | |
RAISE NOTICE ' Open cursor on table [%]', table_access; | |
OPEN cursor SCROLL FOR EXECUTE format( | |
'SELECT *, lead(mid, 1) over (partition by %s, mt order by %s, mt, mvf, mvt) as see_next' | |
' FROM %s ' | |
' ORDER BY %s, mt, mvf, mvt', bkey_string, bkey_string, table_access, bkey_string); | |
LOOP | |
if counter = 0 THEN | |
FETCH NEXT FROM cursor INTO current_row; | |
EXIT WHEN NOT FOUND; | |
current_mvt := current_row.mvt; | |
RAISE NOTICE ' Starting process'; | |
END IF; | |
previous_row := current_row; | |
previous_mvt := current_mvt; | |
FETCH NEXT FROM cursor INTO current_row; | |
EXIT WHEN NOT FOUND; | |
current_mvt := current_row.mvt; | |
counter := counter + 1; | |
IF counter % 50000 = 0 THEN | |
RAISE NOTICE ' Process row %', counter; | |
END IF; | |
IF previous_row.see_next is not null THEN | |
IF previous_mvt > current_row.mvt THEN | |
EXECUTE format('INSERT INTO %s VALUES (%s::text, %s, %s)', | |
temp_table, quote_literal(current_row.mid), current_row.mvt, previous_mvt); | |
IF fix THEN | |
EXECUTE format('UPDATE %s SET mvt = %s where mid = %s', | |
table_access, previous_mvt, quote_literal(current_row.mid)); | |
END IF; | |
current_mvt := previous_mvt; | |
END IF; | |
IF previous_mvt >= current_row.mvf THEN | |
EXECUTE format('INSERT INTO %s VALUES (%s::text, %s, %s)', | |
temp_table, quote_literal(previous_row.mid), previous_mvt, current_row.mvf - 1); | |
IF fix THEN | |
EXECUTE format('UPDATE %s SET mvt = %s where mid = %s', | |
table_access, current_row.mvf -1, quote_literal(previous_row.mid)); | |
END IF; | |
previous_mvt := current_row.mvf - 1; | |
END IF; | |
END IF; | |
END LOOP; | |
CLOSE cursor; | |
RETURN QUERY EXECUTE(format('SELECT * FROM %s', temp_table)); | |
END | |
$func$ LANGUAGE plpgsql; | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment