Created
January 2, 2020 03:25
-
-
Save Hc747/129ce75e6b4237791e8d1f33b737d694 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package au.edu.uac.apply.services | |
import au.edu.uac.agentportal.model.channel.InstitutionChannel | |
import grails.gorm.transactions.Transactional | |
import groovy.sql.GroovyRowResult | |
import groovy.sql.Sql | |
import java.util.concurrent.ConcurrentHashMap | |
import static grails.async.Promises.task | |
@Transactional | |
class CourseOfferingPullService { | |
def dataSource | |
def dataSource_apply | |
def persistenceInterceptor | |
private final Set<String> processing = ConcurrentHashMap.newKeySet() | |
//TODO: ensure transactionality | |
//TODO: expose via controller | |
def process(final String institution) { | |
if (!institution) { | |
throw new IllegalStateException("Institution id not permitted to be null or empty.") | |
} | |
if (!processing.add(institution)) { | |
log.warn("Course Offering pull service is running for institution: ${institution}") | |
return null | |
} | |
task { | |
final def start = new Date() | |
try { | |
final def apply_db = new Sql(dataSource_apply) | |
final def inserts = run { download(apply_db, institution) } | |
final def agent_portal_db = new Sql(dataSource) | |
final def results = run { upload(agent_portal_db, inserts) } | |
log.info("Results: $results") | |
} catch (Throwable t) { | |
log.error("An exception occurred during the execution of the Course Offering pull service for institution: ${institution}", t) | |
} finally { | |
log.info("Course Offering pulling service for instititution (${institution}) is finished in ${new Date().time - start.time} ms") | |
if (!processing.remove(institution)) { | |
throw new IllegalMonitorStateException("Internal state of CourseOfferingPullService#PROCESSING is invalid.") | |
} | |
} | |
} | |
} | |
private <T> T run(final Closure<T> action) { | |
try { | |
persistenceInterceptor.init() | |
return action() | |
} catch (Exception e) { | |
log.error e.message | |
} finally { | |
persistenceInterceptor.destroy() | |
} | |
return null | |
} | |
private static Map<String, String> download(final Sql db, final String institution) { //TODO: use institution parameter | |
final Map<String, String> output = [:] | |
for (final def entry in SCHEMA_MAPPING.entrySet()) { | |
final def table = entry.key | |
final def context = entry.value | |
output[table] = insert(db, table, context.query, context.transformations) | |
} | |
return output | |
} | |
private static def upload(final Sql db, final Map<String, String> payload) { | |
final def output = [:] | |
for (final def entry in payload.entrySet()) { | |
final def table = entry.key | |
final def query = entry.value | |
try { | |
output[table] = query == null ? -1 : db.executeUpdate(query) | |
} catch (Exception e) { | |
output[table] = e.message | |
} | |
} | |
return output | |
} | |
private static final def createTransformer(final String prefix, final Map<String, ?> transformations) { | |
return { final GroovyRowResult row, final String key -> | |
def value = row[prefix + key] | |
if (value == null || (value instanceof String && value.isAllWhitespace())) { | |
return null | |
} | |
if (transformations?.containsKey(key)) { | |
final def transformation = transformations[key] | |
value = transformation(value) | |
} | |
return "'$value'" | |
} | |
} | |
private static final String insert(final Sql db, final String table, final String query, final Map<String, ?> transformations) { | |
final def rows = db.rows(query) | |
if (rows.isEmpty()) { | |
return null | |
} | |
final def prefix = table + '_' | |
final def transformer = createTransformer(prefix, transformations) | |
final def columns = rows.first().collect { r -> r.key.toString().replaceFirst(prefix, '') } | |
final def values = rows.collect { row -> "(${columns.collect { column -> transformer(row, column) }.join(', ')})" }.join(', ') | |
final def update = """ | |
INSERT INTO $table (${columns.join(', ')}) | |
VALUES $values | |
ON DUPLICATE KEY UPDATE ${columns.collect { column -> "$column = VALUES($column)" }.join(', ')} | |
""" | |
return update.trim() | |
} | |
//TODO: ensure column name are aligned with the database table; parameterise institution_code | |
private static final String CHANNEL_COURSE_QUERY = ''' | |
SELECT | |
c.course_code AS channel_course_id, | |
0 AS channel_course_version, | |
c.institution_code AS channel_course_institution_channel_id, | |
c.course_code AS channel_course_channel_course_code, | |
cp.property_value AS channel_course_display_code, | |
cc.cricos_course_code AS channel_course_cricos_code, | |
c.course_status AS channel_course_status, | |
c.course_desc_short AS channel_course_description, | |
c.course_level AS channel_course_level, | |
c.open_to_year12_flag AS channel_course_open_to_year12_flag, | |
c.open_to_os_flag AS channel_course_open_to_os_flag, | |
c.campus_location AS channel_course_campus_location, | |
c.acknowledgement_text AS channel_course_acknowledgement_text | |
FROM course AS c | |
LEFT JOIN cricos_course AS cc ON c.course_code = cc.course_code | |
LEFT JOIN course_properties AS cp ON c.course_code = cp.domain_code AND cp.property_name = 'ALL-PORTAL-AU01' | |
WHERE c.institution_code = 'AU' | |
'''.trim() | |
//TODO: ensure column name are aligned with the database table; parameterise institution_code | |
private static final String COURSE_COMBINATION_QUERY = ''' | |
SELECT | |
cc.course_code as course_combination_course_code, | |
cc.seq_num as course_combination_seq_num, | |
cc.course_code_selectable as course_combination_course_code_selectable, | |
cc.mandatory_flag as course_combination_mandatory_flag | |
FROM course_combination AS cc | |
WHERE | |
EXISTS (SELECT * FROM course c WHERE c.course_code = cc.course_code AND c.institution_code = 'AU') | |
AND | |
EXISTS (SELECT * FROM course c WHERE c.course_code = cc.course_code_selectable AND c.institution_code = 'AU') | |
'''.trim() | |
//TODO: ensure column name are aligned with the database table; parameterise institution_code | |
private static final def COURSE_COMBINATION_INVALID_QUERY = ''' | |
SELECT | |
cc.course_code_fdd as course_combination_course_code_fdd, | |
cc.seq_num as course_combination_invalid_seq_num, | |
cc.course_code_1 as course_combination_invalid_course_code_1, | |
cc.course_code_2 as course_combination_invalid_course_code_2 | |
FROM course_combination_invalid AS cc | |
WHERE | |
EXISTS (SELECT * FROM course c WHERE c.course_code = cc.course_code_1 AND c.institution_code = 'AU') | |
AND | |
EXISTS (SELECT * FROM course c WHERE c.course_code = cc.course_code_2 AND c.institution_code = 'AU') | |
''' | |
private static final def SCHEMA_MAPPING = [ | |
channel_course: [ | |
query: CHANNEL_COURSE_QUERY, | |
transformations: [ | |
description: { String description -> | |
final def index = description.indexOf(' at ') | |
return index > -1 ? description.substring(0, index) : description | |
}, | |
institution_channel_id: { String code -> InstitutionChannel.findByInstChannelCode(code)?.id } | |
] | |
], | |
course_combination: [ | |
query: COURSE_COMBINATION_QUERY | |
], | |
course_combination_invalid: [ | |
query: COURSE_COMBINATION_INVALID_QUERY | |
] | |
] | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment