This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import click | |
import mlflow | |
from hyperopt import fmin, hp, tpe, rand | |
from sklearn.linear_model import LogisticRegression | |
from sklearn.metrics import precision_recall_fscore_support | |
from sklearn.model_selection import train_test_split, cross_val_score | |
from sklearn.pipeline import make_pipeline |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
@Test | |
public void testSpaceAndTimeSplit() throws Exception { | |
TestStream<GenericRecord> input = TestStream.create(GenericRecord.GenericRecordCoder.of()) | |
.addElements( | |
TimestampedValue.of(new GenericRecord(1, Instant.ofEpochSecond(0), new Point(1.f, 1.f)), Instant.ofEpochSecond(0)), | |
TimestampedValue.of(new GenericRecord(1, Instant.ofEpochSecond(10), new Point(2.000001f, 2.000001f)), Instant.ofEpochSecond(10)), | |
TimestampedValue.of(new GenericRecord(1, Instant.ofEpochSecond(20), new Point(1.000002f, 1.000002f)), Instant.ofEpochSecond(20)) | |
) | |
.advanceWatermarkToInfinity(); |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
@Override | |
public void mergeWindows(MergeContext c) throws Exception { | |
List<TiledIntervalWindow> sortedWindows = Lists.newArrayList(c.windows()); | |
switch (strategy) { | |
case SPACE_AND_TIME: | |
sortedWindows.sort(TiledIntervalWindow.SPACE_AND_TIME_COMPARATOR); | |
break; | |
case TIME_ONLY: | |
sortedWindows.sort(TiledIntervalWindow.TIME_ONLY_COMPARATOR); | |
break; |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public static final Comparator<TiledIntervalWindow> TIME_ONLY_COMPARATOR = (o1, o2) -> ComparisonChain.start() | |
.compare(o1.start, o2.start) | |
.compare(o1.end, o2.end) | |
.result(); | |
public static final Comparator<TiledIntervalWindow> SPACE_AND_TIME_COMPARATOR = (o1, o2) -> ComparisonChain.start() | |
.compare(o1.hexAddr, o2.hexAddr) | |
.compare(o1.start, o2.start) | |
.compare(o1.end, o2.end) | |
.result(); |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
@Override | |
public Collection<TiledIntervalWindow> assignWindows(AssignContext c) throws Exception { | |
H3Core h3 = H3Core.newInstance(); | |
Point point = c.element().getValue().getPoint(); | |
String hexAddr = h3.geoToH3Address(point.getLatitude(), point.getLongitude(), h3Resolution); | |
TiledIntervalWindow tiw = new TiledIntervalWindow( | |
hexAddr, | |
c.timestamp(), |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public class TiledIntervalWindow extends BoundedWindow { | |
private final String hexAddr; | |
private final Instant start; | |
private final Instant end; | |
public TiledIntervalWindow(String hexAddr, Instant start, Instant end) { | |
this.hexAddr = hexAddr; | |
this.start = start; | |
this.end = end; |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
plt.figure(figsize=(25,25)) | |
for taxi_id, taxi_marker in zip(taxis, ['v', 'x', '+']): # , '.', ',', 'o', '+']): | |
taxi_df = read_taxi_data_from_file('release/taxi_log_2008_by_id/{taxi_id}.txt'.format(taxi_id=taxi_id)) | |
plt.scatter(taxi_df['longitude'].values.flatten(), | |
taxi_df['latitude'].values.flatten(), marker=taxi_marker, alpha=0.5) | |
plt.xlim(116.1, 116.8) | |
plt.ylim(39.75, 40.10) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
query_index = np.random.choice(taxi_data_pivot.shape[0]) | |
distances, indexes = model_knn.kneighbors( | |
taxi_data_pivot.iloc[query_index, :].values.reshape(1, -1), | |
n_neighbors = 3) | |
taxis = [taxi_data_pivot.index[idx] for idx in indexes.flatten()] | |
for i in range(0, len(distances.flatten())): | |
print('{0}: {1}, with distance of {2}'.format( | |
i, taxi_data_pivot.index[indexes.flatten()[i]], distances.flatten()[i])) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
taxi_data_matrix = csr_matrix(taxi_data_pivot.values) | |
from sklearn.neighbors import NearestNeighbors | |
model_knn = NearestNeighbors(metric = 'cosine', algorithm='brute') | |
model_knn.fit(taxi_data_matrix) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
taxi_data['flag'] = 1 | |
taxi_data_pivot = taxi_data.pivot(index='taxi_id', columns='geohash', values='flag').fillna(0.0) |
NewerOlder