Last active
November 9, 2015 00:50
-
-
Save ewhitebloom/a3b812e0847d9e9b310e to your computer and use it in GitHub Desktop.
Imports data from CMS CSV flat files. Uses the beauty of the SmartyStreets API!
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
import ( | |
"bytes" | |
"database/sql" | |
"encoding/csv" | |
"encoding/json" | |
"fmt" | |
_ "github.com/go-sql-driver/mysql" | |
"io/ioutil" | |
"net/http" | |
"os" | |
"strings" | |
"sync" | |
"time" | |
) | |
var ( | |
MYSQLAUTH = os.Getenv("GO_MYSQLAUTH") | |
LTACHFilePath = os.Getenv("LTACH_FILE") | |
IRFFilePath = os.Getenv("IRF_FILE") | |
HospiceFilePath = os.Getenv("HOSPICE_FILE") | |
badDataFilePath = os.Getenv("BAD_PROVIDER_FILE") | |
SMARTYSTREETS_TOKEN = os.Getenv("SMARTYSTREETS_TOKEN") | |
SMARTYSTREETS_ID = os.Getenv("SMARTYSTREETS_ID") | |
fileToProviderType = map[string]string{ | |
LTACHFilePath: "LongTermCareHospital", | |
IRFFilePath: "AcuteRehabFacility", | |
HospiceFilePath: "Hospice", | |
} | |
wg sync.WaitGroup | |
) | |
type Provider struct { | |
providerType sql.NullString | |
name sql.NullString | |
street sql.NullString | |
city sql.NullString | |
state sql.NullString | |
zip sql.NullString | |
plusFour sql.NullString | |
latitude sql.NullFloat64 | |
longitude sql.NullFloat64 | |
phoneNumber sql.NullString | |
medicareProviderNumber sql.NullString | |
} | |
type SmartyStreetsRequest struct { | |
Street string `json:"street"` | |
City string `json:"city"` | |
State string `json:"state"` | |
Zipcode string `json:"zipcode"` | |
Candidates int `json:"candidates"` | |
} | |
type SmartyStreetsResponse struct { | |
Street string `json:"delivery_line_1"` | |
Components SmartyStreetsComponents | |
Metadata SmartyStreetsMetaData | |
} | |
type SmartyStreetsComponents struct { | |
CityName string `json:"city_name"` | |
State string `json:"state_abbreviation"` | |
Zipcode string `json:"zipcode"` | |
Plus4 string `json:"plus4_code"` | |
} | |
type SmartyStreetsMetaData struct { | |
Latitude float64 | |
Longitude float64 | |
} | |
func (p Provider) String() string { | |
return fmt.Sprintf("{ type: %v, name: %v, street: %v, city: %v, state: %v, zip: %v, latitude: %v, longitude: %v, phone: %v, medicareNumber: %v }", p.providerType.String, p.name.String, p.street.String, p.city.String, p.state.String, p.zip.String, p.latitude.Float64, p.longitude.Float64, p.phoneNumber.String, p.medicareProviderNumber.String) | |
} | |
func (p *Provider) ToCSVRow() string { | |
return fmt.Sprintf("%v, %v, %v, %v, %v, %v, %v, %v, %v, %v", p.providerType.String, p.name.String, p.street.String, p.city.String, p.state.String, p.zip.String, p.latitude.Float64, p.longitude.Float64, p.phoneNumber.String, p.medicareProviderNumber.String) | |
} | |
func (p *Provider) SanitizeAndGeocodeProviderAddress() bool { | |
queryString := "https://api.smartystreets.com/street-address?auth-id=" + SMARTYSTREETS_ID + "&auth-token=" + SMARTYSTREETS_TOKEN | |
smartyRequest := SmartyStreetsRequest{p.street.String, p.city.String, p.state.String, p.zip.String, 1} | |
requestJSON, err := json.Marshal(smartyRequest) | |
if err != nil { | |
panic(err.Error()) | |
} | |
requestJSON = []byte("[" + string(requestJSON) + "]") | |
request, err := http.NewRequest("POST", queryString, bytes.NewReader(requestJSON)) | |
if err != nil { | |
panic(err.Error()) | |
} | |
request.Header.Set("Content-Type", "application/json") | |
request.Header.Set("X-Include-Invalid", "true") | |
client := http.Client{} | |
resp, err := client.Do(request) | |
if err != nil { | |
panic(err.Error()) | |
} | |
defer resp.Body.Close() | |
body, err := ioutil.ReadAll(resp.Body) | |
if err != nil { | |
panic(err.Error()) | |
} | |
data := SmartyStreetsResponse{} | |
stringBody := string(body) | |
stringBody = strings.Replace(stringBody, "[", "", -1) | |
stringBody = strings.Replace(stringBody, "]", "", -1) | |
err2 := json.NewDecoder(strings.NewReader(stringBody)).Decode(&data) | |
if err2 != nil { | |
panic(err.Error()) | |
} | |
if data.Metadata.Latitude == 0 || data.Metadata.Longitude == 0 || data.Street == "" { | |
return false | |
} | |
if len(data.Components.Plus4) == 4 { | |
p.plusFour.Scan(data.Components.Plus4) | |
} | |
p.street.Scan(data.Street) | |
p.city.Scan(data.Components.CityName) | |
p.state.Scan(data.Components.State) | |
p.zip.Scan(data.Components.Zipcode) | |
p.latitude.Scan(data.Metadata.Latitude) | |
p.longitude.Scan(data.Metadata.Longitude) | |
return true | |
} | |
func (p *Provider) QualityDataProvider() bool { | |
if !p.name.Valid || !p.street.Valid || !p.city.Valid || !p.state.Valid || !p.medicareProviderNumber.Valid { | |
return false | |
} | |
return true | |
} | |
func PersistRowCareProvider(provider Provider, transaction *sql.Tx) (bool, int64) { | |
result, err := transaction.Exec("INSERT INTO care_providers (type, name, phone_number, medicare_provider_number, onboarding_stage, updated_at, created_at) VALUES (?,?,?,?,'red',NOW(),NOW())", provider.providerType, provider.name, provider.phoneNumber, provider.medicareProviderNumber) | |
if err != nil { | |
fmt.Println(err.Error()) | |
return false, 0 | |
} else if result != nil { | |
providerId, err := result.LastInsertId() | |
if err != nil || providerId == 0 { | |
fmt.Println(err.Error()) | |
return false, 0 | |
} else { | |
return true, providerId | |
} | |
} | |
return false, 0 | |
} | |
func PersistRowAddress(provider Provider, transaction *sql.Tx, providerId int64) bool { | |
_, err := transaction.Exec("INSERT INTO addresses (addressed_id, addressed_type, street, city, state, zip_code, plus_four, latitude, longitude, created_at, updated_at) VALUES (?,'CareProvider',?,?,?,?,?,?,?,NOW(),NOW())", providerId, provider.street, provider.city, provider.state, provider.zip, provider.plusFour, provider.latitude, provider.longitude) | |
if err != nil { | |
fmt.Println(err.Error()) | |
return false | |
} | |
return true | |
} | |
func QueryStringBuilder(s string) string { | |
return strings.Join(strings.Split(s, " "), "%20") | |
} | |
// Builds a Provider struct, assigning a field only if it's present in the CSV row, otherwise defaults to nil. | |
// Avoids abstracting iteration through struct fields for performance benefit. | |
// These attributes are used to feed the SmartyStreetsAPI, | |
// which then changes them where appropriate, to better formatting. | |
func BuildProvider(row *[]string, providerType string) Provider { | |
provider := Provider{} | |
provider.providerType.Scan(providerType) | |
if len((*row)[17]) == 5 { | |
provider.medicareProviderNumber.Scan("0" + (*row)[17]) | |
} else { | |
provider.medicareProviderNumber.Scan((*row)[17]) | |
} | |
if len((*row)[11]) > 0 { | |
provider.name.Scan((*row)[11]) | |
} | |
if len((*row)[23]) > 0 { | |
provider.street.Scan((*row)[23]) | |
} | |
if len((*row)[4]) > 0 { | |
provider.city.Scan((*row)[4]) | |
} | |
if len((*row)[20]) > 0 { | |
provider.state.Scan((*row)[20]) | |
} | |
if len((*row)[29]) >= 5 { | |
provider.zip.Scan((*row)[29]) | |
} | |
if len((*row)[24]) == 10 { | |
provider.phoneNumber.Scan((*row)[24][:3] + "-" + (*row)[24][3:6] + "-" + (*row)[24][6:]) | |
} | |
return provider | |
} | |
func AnyDuplicates(provider Provider, transaction *sql.Tx) (bool, error) { | |
rows, err := transaction.Query("SELECT * FROM care_providers WHERE medicare_provider_number=?", provider.medicareProviderNumber) | |
defer rows.Close() | |
if err == nil && !rows.Next() { | |
return false, err | |
} else if err != nil { | |
return false, err | |
} else { | |
return true, err | |
} | |
} | |
func ProcessFile(levelOfCareFilePath string, db *sql.DB, badDataFile *os.File) { | |
file, err := os.Open(levelOfCareFilePath) | |
if err != nil { | |
panic(err.Error()) | |
} | |
defer file.Close() | |
if err != nil { | |
panic(err.Error()) | |
} | |
reader := csv.NewReader(file) | |
records, err := reader.ReadAll() | |
for _, record := range records[1:] { | |
wg.Add(1) | |
go func(record []string, levelOfCareFilePath string) { | |
defer wg.Done() | |
provider := BuildProvider(&record, fileToProviderType[levelOfCareFilePath]) | |
if !provider.QualityDataProvider() { | |
badDataFile.WriteString(provider.ToCSVRow() + "\n") | |
return | |
} | |
for { | |
transaction, err := db.Begin() | |
if err != nil { | |
panic(err.Error()) | |
} | |
any, err := AnyDuplicates(provider, transaction) | |
if err != nil { | |
panic(err.Error()) | |
} else if any { | |
transaction.Rollback() | |
break | |
} | |
successfulGeocode := provider.SanitizeAndGeocodeProviderAddress() | |
if !successfulGeocode { | |
badDataFile.WriteString(provider.ToCSVRow() + "\n") | |
transaction.Rollback() | |
return | |
} | |
success, id := PersistRowCareProvider(provider, transaction) | |
if success { | |
success2 := PersistRowAddress(provider, transaction, id) | |
if success2 { | |
err := transaction.Commit() | |
if err == nil { | |
break | |
} else { | |
panic(err.Error()) | |
} | |
} else { | |
transaction.Rollback() | |
} | |
} else { | |
transaction.Rollback() | |
} | |
} | |
}(record, levelOfCareFilePath) | |
} | |
} | |
func main() { | |
db, err := sql.Open("mysql", MYSQLAUTH) | |
if err != nil { | |
panic(err.Error()) | |
} | |
defer db.Close() | |
db.SetMaxOpenConns(5) | |
err2 := db.Ping() | |
if err2 != nil { | |
panic(err2.Error()) | |
} | |
// Dumps bad quality data rows into a file. | |
badDataFile, err3 := os.Create(badDataFilePath) | |
if err3 != nil { | |
panic(err.Error()) | |
} | |
badDataFile.WriteString("providerType, name, street, city, state, zip_code, latitude, longitude, phone, medicare_provider_number\n") | |
start := time.Now() | |
for _, levelOfCareFilePath := range [3]string{LTACHFilePath, IRFFilePath, HospiceFilePath} { | |
ProcessFile(levelOfCareFilePath, db, badDataFile) | |
} | |
wg.Wait() | |
fmt.Println(time.Since(start)) | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment