Last active
September 10, 2021 13:31
-
-
Save Nhoutain/3b9b3a88a9b6c402331f179f02a3b6c6 to your computer and use it in GitHub Desktop.
Fix flow optimisation range ('A, 1, 2', and 'A, 2, 3' --> 'A, 1, 3')
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
DROP FUNCTION IF EXISTS public.flow_business_key(table_name TEXT); | |
DROP FUNCTION IF EXISTS public.flow_business_data(table_name TEXT); | |
DROP FUNCTION IF EXISTS public.flows(); | |
DROP FUNCTION IF EXISTS public.fix_flows_successive_data(fix boolean); | |
DROP FUNCTION IF EXISTS public.fix_flow_successive_data(table_access TEXT, fix boolean); | |
DROP FUNCTION IF EXISTS public.do_fix_flow_successive_data(table_access TEXT, bkey TEXT[], data TEXT[], fix boolean); | |
DROP FUNCTION IF EXISTS public.NEW_fix_flows_successive_data(fix boolean); | |
DROP FUNCTION IF EXISTS public.NEW_fix_flow_successive_data(table_access TEXT, size bigint, fix boolean); | |
DROP FUNCTION IF EXISTS public.NEW_do_fix_flow_successive_data(table_access TEXT, temp_table TEXT, bkey TEXT[], data TEXT[], index bigint, size bigint, fix boolean); | |
-- ---------------------------------------------------------------------- | |
-- FLOWS | |
-- ---------------------------------------------------------------------- | |
CREATE OR REPLACE FUNCTION public.flow_business_key(table_name TEXT) | |
RETURNS TABLE(bkey text[]) AS | |
$body$ | |
SELECT ('{'||(regexp_matches(indexdef, '\((.*), mt, mvf\)'))[1]||'}')::text[] | |
FROM pg_indexes indexes | |
WHERE indexes.tablename = $1 and indexname like '%_bkey' | |
$body$ | |
LANGUAGE sql; | |
CREATE OR REPLACE FUNCTION public.flow_business_data(table_name TEXT) | |
RETURNS TABLE(data text[]) AS | |
$func$ | |
DECLARE | |
data text[]; | |
BEGIN | |
data := ARRAY(SELECT '"' || columns.column_name || '"' | |
FROM information_schema.columns columns | |
WHERE columns.table_name = $1 | |
and columns.column_name <> 'mvf' | |
and columns.column_name <> 'mvt' | |
and columns.column_name <> 'mt' | |
and columns.column_name <> 'mr' | |
and columns.column_name <> 'mm' | |
and columns.column_name <> 'mid' | |
and columns.column_name <> 'mn' | |
); | |
RETURN QUERY EXECUTE(format('SELECT %s::text[]', quote_literal(data))); | |
END | |
$func$ LANGUAGE plpgsql; | |
CREATE OR REPLACE FUNCTION public.flows() | |
RETURNS TABLE(table_access text, bkey text[], data text[]) AS | |
$body$ | |
SELECT table_schema || '.' || table_name as table_access, flow_business_key(table_name) as bkey, flow_business_data(table_name) as data | |
FROM information_schema.tables | |
WHERE table_name in (select distinct('flow_' || REPLACE(flow_id, '-', '')) from injection where status = 'DONE') | |
$body$ | |
LANGUAGE sql; | |
-- ---------------------------------------------------------------------- | |
-- FIX SUCCESSIVE | |
-- ---------------------------------------------------------------------- | |
CREATE OR REPLACE FUNCTION public.fix_flows_successive_data(fix boolean DEFAULT false) | |
RETURNS TABLE(table_access text, mid text, new_mvf bigint) AS | |
$func$ | |
DECLARE | |
rec record; | |
counter INTEGER := 0 ; | |
total INTEGER := 0 ; | |
BEGIN | |
total := (SELECT count(*)::INTEGER FROM flows()); | |
RAISE NOTICE 'Fixing successive on all tables [%]', total; | |
FOR rec IN SELECT * FROM flows() | |
LOOP | |
counter := counter + 1 ; | |
RAISE NOTICE ' [%/%] ...', counter, total; | |
RETURN QUERY EXECUTE format( | |
'SELECT %s, * from public.do_fix_flow_successive_data(%s, %s, %s, %s)', | |
quote_literal(rec.table_access), quote_literal(rec.table_access), quote_literal(format('%s', rec.bkey)), quote_literal(format('%s', rec.data)), quote_literal(fix)); | |
END LOOP; | |
END | |
$func$ LANGUAGE plpgsql; | |
CREATE OR REPLACE FUNCTION public.fix_flow_successive_data(table_access TEXT, fix boolean DEFAULT false) | |
RETURNS TABLE(mid text, new_mvf bigint) AS | |
$func$ | |
DECLARE | |
bkey text[]; | |
data text[]; | |
BEGIN | |
EXECUTE format('select bkey from public.flow_business_key(%s)', quote_literal(REPLACE(table_access, 'flows.', ''))) | |
INTO bkey; | |
EXECUTE format('select data from public.flow_business_data(%s)', quote_literal(REPLACE(table_access, 'flows.', ''))) | |
INTO data; | |
RETURN QUERY EXECUTE format( | |
'SELECT mid, new_mvf from public.do_fix_flow_successive_data(%s, %s, %s, %s)', | |
quote_literal(table_access), quote_literal(format('%s', bkey)), quote_literal(format('%s', data)), quote_literal(fix)); | |
END | |
$func$ LANGUAGE plpgsql; | |
CREATE OR REPLACE FUNCTION public.do_fix_flow_successive_data(table_access TEXT, bkey TEXT[], data TEXT[], fix boolean DEFAULT false) | |
RETURNS TABLE(mid text, new_mvf bigint) AS | |
$func$ | |
DECLARE | |
temp_table text := format('temp_update_%s', REPLACE(table_access, 'flows.', '')); | |
cursor REFCURSOR; | |
previous_row RECORD; | |
current_row RECORD; | |
merge_mid text; | |
merge_mvt bigint; | |
bkey_string text := array_to_string(array_agg(bkey), ', '); | |
data_string text := array_to_string(array_agg(data), ', '); | |
counter bigint := 0 ; | |
BEGIN | |
RAISE NOTICE ' Fixing successive on table [%] (%)', table_access, bkey_string; | |
RAISE NOTICE ' Create temporary update for table [%]', table_access; | |
EXECUTE format(' CREATE TEMPORARY TABLE %s (mid text, new_mvf bigint) ON COMMIT DROP', temp_table); | |
RAISE NOTICE ' Open cursor on table [%]', table_access; | |
OPEN cursor SCROLL FOR EXECUTE format( | |
'SELECT *, lead(mid, 1) over (partition by mt, %s order by mt, %s, mvf, mvt) as see_next' | |
' FROM %s ' | |
' ORDER BY mt, %s, mvf, mvt', data_string, data_string, table_access, data_string); | |
LOOP | |
if counter = 0 THEN | |
FETCH NEXT FROM cursor INTO current_row; | |
EXIT WHEN NOT FOUND; | |
RAISE NOTICE ' Starting process'; | |
END IF; | |
previous_row := current_row; | |
FETCH NEXT FROM cursor INTO current_row; | |
EXIT WHEN NOT FOUND; | |
counter := counter + 1; | |
IF counter % 50000 = 0 THEN | |
RAISE NOTICE ' Process row %', counter; | |
END IF; | |
IF previous_row.see_next is not null AND previous_row.mvt + 1 = current_row.mvf THEN | |
if merge_mid is null THEN | |
-- We start a series | |
merge_mid := previous_row.mid; | |
merge_mvt := previous_row.mvt; | |
end if; | |
EXECUTE format('INSERT INTO %s VALUES (%s::text, null)', temp_table, quote_literal(current_row.mid)); | |
IF fix THEN | |
EXECUTE format('DELETE FROM %s where mid = %s', table_access, quote_literal(current_row.mid)); | |
END IF; | |
merge_mvt := current_row.mvt; | |
ELSE | |
if merge_mid is not null THEN | |
-- We have a rupture, so we must update the last rupture | |
EXECUTE format('INSERT INTO %s VALUES (%s::text, %s)', temp_table, quote_literal(merge_mid), merge_mvt); | |
IF fix THEN | |
EXECUTE format('UPDATE %s SET mvt = %s where mid = %s', table_access, merge_mvt, quote_literal(merge_mid)); | |
END IF; | |
merge_mvt := null; | |
merge_mid := null; | |
END IF; | |
end if; | |
END LOOP; | |
CLOSE cursor; | |
RETURN QUERY EXECUTE(format('SELECT * FROM %s', temp_table)); | |
END | |
$func$ LANGUAGE plpgsql; | |
CREATE OR REPLACE FUNCTION public.NEW_fix_flow_successive_data(table_access TEXT, size bigint, fix boolean DEFAULT false) | |
RETURNS TABLE(mid text, new_mvf bigint) AS | |
$func$ | |
DECLARE | |
temp_table text := format('temp_update_%s', REPLACE(table_access, 'flows.', '')); | |
bkey text[]; | |
data text[]; | |
max_size bigint; | |
count bigint := 0; | |
BEGIN | |
EXECUTE format('select bkey from public.flow_business_key(%s)', quote_literal(REPLACE(table_access, 'flows.', ''))) | |
INTO bkey; | |
EXECUTE format('select data from public.flow_business_data(%s)', quote_literal(REPLACE(table_access, 'flows.', ''))) | |
INTO data; | |
EXECUTE format('select count(*) from %s', table_access) | |
INTO max_size; | |
RAISE NOTICE ' Create temporary update for table [%]', table_access; | |
EXECUTE format(' CREATE TEMPORARY TABLE %s (mid text, new_mvf bigint) ON COMMIT DROP', temp_table); | |
RAISE NOTICE 'Fixing table % with max size %', table_access, max_size; | |
WHILE max_size > count LOOP | |
RAISE NOTICE ' index [%/%] ...', count, max_size; | |
RETURN QUERY EXECUTE format( | |
'SELECT mid, new_mvf from public.NEW_do_fix_flow_successive_data(%s, %s, %s, %s, %s, %s, %s)', | |
quote_literal(table_access), | |
quote_literal(temp_table), | |
quote_literal(format('%s', bkey)), | |
quote_literal(format('%s', data)), | |
count, | |
size, | |
quote_literal(fix)); | |
count := count + size; | |
END LOOP; | |
END | |
$func$ LANGUAGE plpgsql; | |
CREATE OR REPLACE FUNCTION public.NEW_do_fix_flow_successive_data(table_access TEXT, temp_table text, bkey TEXT[], data TEXT[], index bigint, size bigint, fix boolean DEFAULT false) | |
RETURNS TABLE(mid text, new_mvf bigint) AS | |
$func$ | |
DECLARE | |
cursor REFCURSOR; | |
counter bigint := 0 ; | |
previous_row record; | |
current_row record; | |
bkey_string text := array_to_string(array_agg(bkey), ', '); | |
merge_mid text; | |
merge_mvt bigint; | |
BEGIN | |
RAISE NOTICE ' Fixing successive on table [%] (%)', table_access, bkey_string; | |
RAISE NOTICE ' Open cursor on table [%]', table_access; | |
OPEN cursor SCROLL FOR EXECUTE format('SELECT * FROM %s ORDER BY %s, mt, mvf offset %s rows fetch first %s rows only', | |
table_access, bkey_string, index, size); | |
LOOP | |
if counter = 0 THEN | |
FETCH NEXT FROM cursor INTO current_row; | |
EXIT WHEN NOT FOUND; | |
RAISE NOTICE ' Starting process'; | |
END IF; | |
previous_row := current_row; | |
FETCH NEXT FROM cursor INTO current_row; | |
EXIT WHEN NOT FOUND; | |
counter := counter + 1; | |
IF counter % 50000 = 0 THEN | |
RAISE NOTICE ' Process row %', counter; | |
END IF; | |
IF successiveRecord(previous_row, current_row, data) THEN | |
if merge_mid is null THEN | |
-- We start a series | |
merge_mid := previous_row.mid; | |
merge_mvt := previous_row.mvt; | |
end if; | |
EXECUTE format('INSERT INTO %s VALUES (%s::text, null)', temp_table, quote_literal(current_row.mid)); | |
IF fix THEN | |
EXECUTE format('DELETE FROM %s where mid = %s', table_access, quote_literal(current_row.mid)); | |
END IF; | |
merge_mvt := current_row.mvt; | |
ELSE | |
if merge_mid is not null THEN | |
-- We have a rupture, so we must update the last rupture | |
EXECUTE format('INSERT INTO %s VALUES (%s::text, %s)', temp_table, quote_literal(merge_mid), merge_mvt); | |
IF fix THEN | |
EXECUTE format('UPDATE %s SET mvt = %s where mid = %s', table_access, merge_mvt, quote_literal(merge_mid)); | |
END IF; | |
merge_mvt := null; | |
merge_mid := null; | |
END IF; | |
end if; | |
END LOOP; | |
CLOSE cursor; | |
RETURN QUERY EXECUTE(format('SELECT * FROM %s', temp_table)); | |
END | |
$func$ LANGUAGE plpgsql; | |
CREATE OR REPLACE FUNCTION public.successiveRecord(first record, second record, popos text[]) | |
RETURNS bool AS | |
$func$ | |
DECLARE | |
c text; | |
first_json json; | |
second_json json; | |
BEGIN | |
IF first.mt = second.mt AND first.mvt + 1 = second.mvf THEN | |
first_json := to_json(first); | |
second_json := to_json(second); | |
FOREACH c in ARRAY popos LOOP | |
if (first_json ->> c) <> (second_json ->> c) THEN | |
RETURN false; | |
end if; | |
end loop; | |
ELSE | |
return false; | |
end if; | |
RETURN true; | |
END | |
$func$ LANGUAGE plpgsql; |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment