Skip to content

Instantly share code, notes, and snippets.

Last active July 18, 2024 12:45
Show Gist options
  • Save worldbeater/9c36c43b64a41d6dfee965e24152c1b5 to your computer and use it in GitHub Desktop.
Save worldbeater/9c36c43b64a41d6dfee965e24152c1b5 to your computer and use it in GitHub Desktop.
Sample programs for debugging intelligent static analysis tool which uses program representations based on Markov chains constructed from AST, see
# type: ignore
def query():
array = []
for i in range(10000):
array.append(i * 2)
def tree():
a = 10
nums = [] # A1
c = 20
for i in range(10): # A1
a = 2 ** i + 2 * 2 # A1
b = 10 + 8 + a # A1
nums.append(b) # A1
a = 10
abc = [] # A3
items = [] # A2
a = a + 5 % 2
for i in range(0, 10, 2): # A2
items.append(i) # A2
a = 12
complex = True
for i in range(10): # A3
a = i * 4 # A3
a = a + 42 # A3
a = a - randint(4) # A3
abc.append(a) # A3
use = [] # A4
for i in range(10): # A4
a = i * 4 + 42 - randint(4) # A4
use.append(a) # A4
worker = worker.Work(complex)
return nums, c
# type: ignore
def query():
out = []
for i in range(len(inp)):
out.append(inp[i] - inp[i])
def tree(alpha, x, y, iterations):
theta = []
for i in range(len(x[0])):
for _ in range(iterations):
update = [] # A2
outputs = []
for example in x:
output = 0
for i in range(len(example)):
output = output + example[i] * theta[i]
activations = [] # A3
for i in range(len(outputs)): # A3
activations.append(1 / (1 + math.exp(1 - outputs[i]))) # A3
multipliers = [] # A1
for i in range(len(y)): # A1
multipliers.append((activations[i] - y[i]) / len(y)) # A1
transpose = []
for i in range(len(x[0])):
transi = []
for j in range(len(x)):
gradient = []
for transj in transpose:
component = 0
for i in range(len(x)):
component = component + transj[i] * multipliers[i]
for i in range(len(theta)): # A2
update.append(theta[i] - alpha * gradient[i]) # A2
theta = update
return theta
# type: ignore
def query():
array = []
for item in items:
class StatusManager:
def __init__(
tasks: TaskRepository,
groups: GroupRepository,
variants: VariantRepository,
statuses: TaskStatusRepository,
config: AppConfigManager,
seeds: FinalSeedRepository,
checks: MessageCheckRepository
self.tasks = tasks
self.groups = groups
self.variants = variants
self.statuses = statuses
self.config = config
self.seeds = seeds
self.checks = checks
self.achievements = None
def get_group_rating(self) -> dict[int, list[GroupInRatingDto]]:
def key(info: tuple[Group, int]):
group, _ = info
config = self.config.config.groups
rating = self.statuses.get_group_rating()
places: dict[int, list[GroupInRatingDto]] = dict()
for _, pairs in groupby(sorted(rating, key=key), key):
pairs = list(pairs)
group, _ = pairs[0]
earned = sum(1 for group, var in pairs if var is not None and var < config.get(group.title, 40))
places.setdefault(earned, [])
places[earned].append(GroupInRatingDto(group, earned))
return dict(sorted(places.items(), reverse=True))
def get_rating(self) -> dict[int, list[StudentInRatingDto]]:
def key(info: tuple[Group, TaskStatus]):
_, status = info
return, status.variant
achievements = self.__read_achievements()
statuses = self.statuses.get_rating()
tasks = self.tasks.get_all()
places: dict[int, list[StudentInRatingDto]] = dict()
for _, pairs in groupby(sorted(statuses, key=key), key):
pairs = list(pairs)
group, status = pairs[0]
tids = [status.task for _, status in pairs]
if any( not in tids for task in tasks):
active = sum(len(status.achievements or [0]) for _, status in pairs if str(status.task) in achievements)
inactive = sum(1 for _, status in pairs if str(status.task) not in achievements)
earned = active + inactive
places.setdefault(earned, [])
places[earned].append(StudentInRatingDto(group, status.variant, earned))
ordered = sorted(places.items(), reverse=True)
return dict(ordered[0:self.config.config.places_in_rating])
def get_variant_statuses(self, gid: int, vid: int) -> VariantDto:
config = self.config.config
group = self.groups.get_by_id(gid)
variant = self.variants.get_by_id(vid)
statuses = self.__get_statuses(
e = self.__get_external_task_manager(group)
tasks = self.__get_tasks(group, config, e.random_active)
dto = self.__get_variant(group, variant, tasks, statuses, config, e)
return dto
def get_task_status(self, gid: int, vid: int, tid: int) -> TaskStatusDto:
status = self.statuses.get_task_status(tid, vid, gid)
achievements = self.__get_task_achievements(tid)
return self.__get_task_status_dto(gid, vid, tid, status, achievements)
def get_group_statuses(self, group_id: int) -> GroupDto:
config = self.config.config
group = self.groups.get_by_id(group_id)
variants = self.variants.get_all()
statuses = self.__get_statuses(
e = self.__get_external_task_manager(group)
tasks = self.__get_tasks(group, config, e.random_active)
dtos: list[VariantDto] = [] # A2
for var in variants: # A2
dto = self.__get_variant(group, var, tasks, statuses, config, e) # A2
dtos.append(dto) # A2
return GroupDto(group, tasks, dtos)
def get_submissions_statuses_by_info(self, group: int, variant: int, task: int, skip: int, take: int):
registration = self.config.config.enable_registration
submissions = [] # A4
checks = self.checks.get_by_task(group, variant, task, skip, take, registration)
for check, message, student in checks: # A4
submissions.append(self.__get_submissions(check, message, student)) # A4
return submissions
def get_submissions_statuses(self, student: Student, skip: int, take: int) -> list[SubmissionDto]:
checks_and_messages: list[tuple[MessageCheck, Message]] = self.checks.get_by_student(student, skip, take)
submissions = [] # A1
for check, message in checks_and_messages: # A1
submissions.append(self.__get_submissions(check, message, None)) # A1
return submissions
def __get_tasks(
group: Group,
config: AppConfig,
active: bool
) -> list[TaskDto]:
dtos: list[TaskDto] = [] # A3
tasks = self.tasks.get_all()
for task in tasks: # A3
dto = TaskDto(group, task, config, active) # A3
dtos.append(dto) # A3
return dtos
def __get_submissions(
check: MessageCheck,
message: Message,
student: Student | None
return SubmissionDto(self.__get_task_status_dto(, message.variant, message.task, TaskStatus(
), []), message.code, check.time, message.time, student)
def __get_task_status_dto(
gid: int, vid: int, tid: int,
status: TaskStatus | None,
achievements: list[int]
) -> TaskStatusDto:
config = self.config.config
group = self.groups.get_by_id(gid)
variant = self.variants.get_by_id(vid)
task = self.tasks.get_by_id(tid)
e = self.__get_external_task_manager(group)
ext = e.get_external_task(,
task_dto = TaskDto(group, task, config, e.random_active)
return TaskStatusDto(group, variant, task_dto, status, ext, config, achievements)
def __get_task_achievements(self, task: int) -> list[int]:
achievements = self.__read_achievements()
stid = str(task)
achievements = achievements[stid] if stid in achievements else []
return achievements
def __read_achievements(self) -> dict[str, list[int]]:
if self.achievements is not None:
return self.achievements
analytics = self.config.config.analytics_path
spec = os.path.join(analytics, 'specification.json')
if not os.path.exists(spec):
self.achievements = dict()
return self.achievements
with open(spec, 'r') as file:
content =
spec = json.loads(content)
self.achievements = spec
return self.achievements
def __get_external_task_manager(self, group: Group) -> ExternalTaskManager:
seed = self.seeds.get_final_seed(
return ExternalTaskManager(
def __get_variant(
group: Group,
variant: Variant,
tasks: list[TaskDto],
statuses: dict[tuple[int, int], TaskStatus],
config: AppConfig,
external: ExternalTaskManager,
) -> VariantDto:
dtos: list[TaskStatusDto] = [] # A5
for task in tasks: # A5
composite_key: tuple[int, int] = (, # A5
status = statuses.get(composite_key) # A5
e = external.get_external_task(, # A5
achievements = self.__get_task_achievements( # A5
dto = TaskStatusDto(group, variant, task, status, e, config, achievements) # A5
dtos.append(dto) # A5
return VariantDto(variant, dtos)
def __get_statuses(self, group: int) -> dict[tuple[int, int], TaskStatus]:
statuses = self.statuses.get_by_group(group=group)
dictionary = dict()
for status in statuses:
composite_key: tuple[int, int] = (status.variant, status.task)
dictionary[composite_key] = status
return dictionary
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment