Last active
August 6, 2023 16:21
-
-
Save pietjepuk2/eb021db978ad20bfd94dce485be63150 to your computer and use it in GitHub Desktop.
Get validator duties (find largest gap in which to update for that $0.04 it will save/benefit you)
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import json | |
import math | |
from datetime import datetime, timedelta | |
import requests | |
SLOTS_PER_EPOCH = 32 | |
SECONDS_PER_SLOT = 12 | |
def main(validators_indices, eth2_api_url="http://localhost:5052/eth/v1/"): | |
def api_get(endpoint): | |
return requests.get(f"{eth2_api_url}{endpoint}").json() | |
def api_post(endpoint, data): | |
return requests.post(f"{eth2_api_url}{endpoint}", data).json() | |
head_slot = int(api_get("beacon/headers/head")["data"]["header"]["message"]["slot"]) | |
epoch = head_slot // SLOTS_PER_EPOCH | |
cur_epoch_data = api_post(f"validator/duties/attester/{epoch}", json.dumps(validators_indices))[ | |
"data" | |
] | |
next_epoch_data = api_post( | |
f"validator/duties/attester/{epoch + 1}", json.dumps(validators_indices) | |
)["data"] | |
genesis_timestamp = 1606824023 | |
attestation_duties = {} | |
for d in (*cur_epoch_data, *next_epoch_data): | |
attestation_duties.setdefault(int(d["slot"]), []).append(d["validator_index"]) | |
attestation_duties = {k: v for k, v in sorted(attestation_duties.items()) if k > head_slot} | |
all_proposer_duties = api_get(f"validator/duties/proposer/{epoch}")["data"] | |
validators_indices_set = set(validators_indices) | |
duties = attestation_duties.copy() | |
for s in all_proposer_duties: | |
slot = int(s["slot"]) | |
if slot <= head_slot: | |
continue | |
prop_index = int(s["validator_index"]) | |
if prop_index in validators_indices_set: | |
duties.setdefault(slot, []).append(f"{prop_index} (proposal)") | |
duties = dict(sorted(duties.items())) | |
# Also insert (still unknown) attestation duties at epoch after next, | |
# assuming worst case of having to attest at its first slot | |
first_slot_epoch_p2 = (epoch + 2) * SLOTS_PER_EPOCH | |
attestation_duties[first_slot_epoch_p2] = [] | |
print(f"Calculating attestation/proposal slots and gaps for validators:") | |
print(f" {validators_indices}") | |
print("\nUpcoming voting/proposal slots and gaps") | |
print("(Gap in seconds)") | |
print("(slot/epoch - time range - validators)") | |
print("*" * 80) | |
prev_end_time = datetime.now() | |
# Floor to seconds | |
prev_end_time = datetime(*datetime.utctimetuple(prev_end_time)[:6]) | |
# Current epoch gaps | |
cur_epoch_gap_store = {"longest_gap": timedelta(seconds=0), "gap_time_range": (None, None)} | |
overall_gap_store = cur_epoch_gap_store.copy() | |
next_epoch_start_slot = (epoch + 1) * SLOTS_PER_EPOCH | |
next_epoch_start_time = datetime.fromtimestamp(genesis_timestamp + next_epoch_start_slot * 12.0) | |
in_next_epoch = False | |
def _update_gap(end, start, gap_store): | |
gap = end - start | |
assert gap.total_seconds() >= 0 | |
if gap > gap_store["longest_gap"]: | |
gap_store["longest_gap"] = gap | |
gap_store["gap_time_range"] = (end, start) | |
for slot, validators in duties.items(): | |
slot_start = datetime.fromtimestamp(genesis_timestamp + slot * SECONDS_PER_SLOT) | |
slot_end = slot_start + timedelta(seconds=SECONDS_PER_SLOT) | |
suf = "" | |
if not in_next_epoch and slot >= next_epoch_start_slot: | |
print("- " * 40) | |
print( | |
f"Time until epoch change: {math.floor((next_epoch_start_time - prev_end_time).total_seconds())} seconds" | |
) | |
print( | |
f"Epoch boundary (proposal duties are not yet known for next epoch): {next_epoch_start_time}" | |
) | |
print( | |
f"Time until next duty: {math.floor((slot_start - next_epoch_start_time).total_seconds())} seconds" | |
) | |
print("- " * 40) | |
suf = "(after prev. slot duty or current time)" | |
_update_gap(next_epoch_start_time, prev_end_time, cur_epoch_gap_store) | |
in_next_epoch = True | |
print(f"Gap - {math.floor((slot_start - prev_end_time).total_seconds())} seconds {suf}") | |
if validators: | |
print( | |
f" {slot}/{slot // SLOTS_PER_EPOCH}" | |
f" - {slot_start.strftime('%H:%M:%S')} until {slot_end.strftime('%H:%M:%S')}" | |
f" - [{', '.join(validators)}]" | |
) | |
else: | |
assert slot % SLOTS_PER_EPOCH == 0 | |
_update_gap(slot_start, prev_end_time, overall_gap_store) | |
if in_next_epoch is False: | |
_update_gap(slot_start, prev_end_time, cur_epoch_gap_store) | |
prev_end_time = slot_end | |
print("\nLongest attestation and proposer duty gap (only current epoch, first):") | |
longest_gap, gap_time_range = cur_epoch_gap_store.values() | |
print("*" * 80) | |
print( | |
f"{longest_gap.total_seconds()} seconds" | |
f" ({int(longest_gap.total_seconds()) // SECONDS_PER_SLOT} slots)," | |
f" from {gap_time_range[1].strftime('%H:%M:%S')}" | |
f" until {gap_time_range[0].strftime('%H:%M:%S')}" | |
) | |
print("\nLongest attestation gap (first):") | |
longest_gap, gap_time_range = overall_gap_store.values() | |
print("*" * 80) | |
print( | |
f"{longest_gap.total_seconds()} seconds" | |
f" ({int(longest_gap.total_seconds()) // SECONDS_PER_SLOT} slots)," | |
f" from {gap_time_range[1].strftime('%H:%M:%S')}" | |
f" until {gap_time_range[0].strftime('%H:%M:%S')}" | |
) | |
if __name__ == "__main__": | |
import argparse | |
parser = argparse.ArgumentParser( | |
description="Show validator duties of current and next epoch to find largest gap." | |
) | |
parser.add_argument("indices", metavar="index", type=int, nargs="+", help="validator indices") | |
args = parser.parse_args() | |
main(args.indices) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Hey buddy, thanks for sharing!
Getting an error when trying to execute:
Traceback (most recent call last):
File "get_validator_duties.py", line 95, in
main(args.indices)
File "get_validator_duties.py", line 22, in main
cur_epoch_data = api_post(f"validator/duties/attester/{epoch}", json.dumps(validators_indices))[
KeyError: 'data'
Any ideas?
Cheers