Skip to content

Instantly share code, notes, and snippets.

Created March 27, 2017 06:47
Show Gist options
  • Save MarkusPfundstein/246446e428ab10dbd2383cf3204a19d0 to your computer and use it in GitHub Desktop.
Save MarkusPfundstein/246446e428ab10dbd2383cf3204a19d0 to your computer and use it in GitHub Desktop.
Javascript Reader.T(Task) example (why to use Monads)
const Task = require('data.task');
const Maybe = require('data.maybe');
const { Reader, ReaderT } = require('ramda-fantasy');
const R = require('ramda');
const mysql = require('mysql');
const request = require('request');
const ReaderTask = Reader.T(Task);
const logE = (...x) => console.error('oh oh error', ...x);
const log = (...x) => console.log(...x);
const safeHead = xs => xs.length > 0 ? Maybe.Just(xs[0]) : Maybe.Nothing();
const config = {
sql : {
host : '',
user : '',
password: '',
database: ''
elasticSearch: {
host : '',
port : '',
index : 'docs',
batchUpload : {
delay: 250,
size: 50
const createEnvironment = config => ({
sqlConnection : mysql.createConnection(config.sql),
elasticSearch : config.elasticSearch,
const checkRequestOk = request => request.statusCode == 200 || request.statusCode == 201;
const putPublicationToElastic = publication => ReaderTask(env => {
const id =;
return new Task((rej, res) => {
log(`upload publication ${}`);
url : `http://${}:${env.elasticSearch.port}/${env.elasticSearch.index}/external/${id}`,
method : 'PUT',
json : publication
}, (error, request, body) => error ? rej(error) : (checkRequestOk(request) ? res(body) : rej(body)))
const sqlQuery = query => ReaderTask(env =>
new Task((rej, res) =>
env.sqlConnection.query(query, (error, results, fields) =>
error ? rej(error) : res(results))))
const closeConnection = data => ReaderTask(env =>
new Task((rej, res) =>
env.sqlConnection.end(_ => res(data))));
const tryGetFileTextFromFile = file =>
sqlQuery(`select txt from files_text where file_id = ${} limit 1`)
const tryGetPublicationFromFile = file =>
sqlQuery(`select * from publications where id = ${file.publication_id} limit 1`)
const createUploadDocument = R.curry((file, maybeText, maybePublication) =>
Nothing: () =>({}),
Just: textRow => {
const publication = maybePublication.getOrElse({});
return {
id :,
abstract: file.abstract,
publishDate: publication.publishdate || null,
title: file.title,
type: file.type,
text: textRow.txt // text not for result
const getAllTexts =
sqlQuery(`select * from files`)
(result, file) =>
result.chain(xs =>
.map(newDoc => [...xs, newDoc])),
ReaderTask(_ => Task.of([]))));
const wait = delay => ReaderTask(env =>
new Task((rej, res) => setTimeout(res, delay)));
// reduces 'list of list of Tasks to upload a publication' to one task.
// [[pub1, ..., pub9], [pub10, ..., pub19], ...]
// between every list of tasks, 'delay'-milliseconds are waited
const uploadPubsWithDelay = delay =>
R.reduce((result, next) =>
result.chain(xs =>
.chain(_ =>
R.traverse(ReaderTask.of, putPublicationToElastic, next).map(ys => [...xs, ...ys]))),
ReaderTask(_ => Task.of([])))
const delayedBatchUpload = pubs => ReaderTask.ask.chain(env =>
// make batches
// we upload with x ms delay between every batch. In the hope
// elastic search won't get killed. it will without delay :-)
const runApp = env =>
// filter all publications without text
.map(R.tap(pubs => log(`fetched ${pubs.length} documents`)))
.map(R.filter(pub => pub.text && pub.text.length > 0))
.map(R.tap(pubs => log(`try to upload ${pubs.length} documents`)))
.map(inserted => `inserted ${inserted.length} documents`)
// To-DO: closeConnection also in case of failure
.fork(logE, log)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment