Skip to content

Instantly share code, notes, and snippets.

@worldbeater
Last active July 18, 2024 12:45
Show Gist options
  • Save worldbeater/9c36c43b64a41d6dfee965e24152c1b5 to your computer and use it in GitHub Desktop.
Save worldbeater/9c36c43b64a41d6dfee965e24152c1b5 to your computer and use it in GitHub Desktop.
Sample programs for debugging intelligent static analysis tool which uses program representations based on Markov chains constructed from AST, see https://vestnik.rsreu.ru/ru/archive/2023/vypusk-86/1455-1995-4565-2023-86-96-109
# type: ignore
def query():
array = []
for i in range(10000):
array.append(i * 2)
def tree():
a = 10
nums = [] # A1
c = 20
for i in range(10): # A1
a = 2 ** i + 2 * 2 # A1
b = 10 + 8 + a # A1
nums.append(b) # A1
a = 10
abc = [] # A3
items = [] # A2
a = a + 5 % 2
for i in range(0, 10, 2): # A2
items.append(i) # A2
a = 12
complex = True
for i in range(10): # A3
a = i * 4 # A3
a = a + 42 # A3
a = a - randint(4) # A3
abc.append(a) # A3
use = [] # A4
for i in range(10): # A4
a = i * 4 + 42 - randint(4) # A4
use.append(a) # A4
worker = worker.Work(complex)
worker.run(worker.config)
return nums, c
# type: ignore
def query():
out = []
for i in range(len(inp)):
out.append(inp[i] - inp[i])
def tree(alpha, x, y, iterations):
theta = []
for i in range(len(x[0])):
theta.append(0)
for _ in range(iterations):
update = [] # A2
outputs = []
for example in x:
output = 0
for i in range(len(example)):
output = output + example[i] * theta[i]
outputs.append(output)
activations = [] # A3
for i in range(len(outputs)): # A3
activations.append(1 / (1 + math.exp(1 - outputs[i]))) # A3
multipliers = [] # A1
for i in range(len(y)): # A1
multipliers.append((activations[i] - y[i]) / len(y)) # A1
transpose = []
for i in range(len(x[0])):
transi = []
for j in range(len(x)):
transi.append(x[j][i])
transpose.append(transi)
gradient = []
for transj in transpose:
component = 0
for i in range(len(x)):
component = component + transj[i] * multipliers[i]
gradient.append(component)
for i in range(len(theta)): # A2
update.append(theta[i] - alpha * gradient[i]) # A2
theta = update
return theta
# type: ignore
def query():
array = []
for item in items:
array.append(call(item.value))
class StatusManager:
def __init__(
self,
tasks: TaskRepository,
groups: GroupRepository,
variants: VariantRepository,
statuses: TaskStatusRepository,
config: AppConfigManager,
seeds: FinalSeedRepository,
checks: MessageCheckRepository
):
self.tasks = tasks
self.groups = groups
self.variants = variants
self.statuses = statuses
self.config = config
self.seeds = seeds
self.checks = checks
self.achievements = None
def get_group_rating(self) -> dict[int, list[GroupInRatingDto]]:
def key(info: tuple[Group, int]):
group, _ = info
return group.id
config = self.config.config.groups
rating = self.statuses.get_group_rating()
places: dict[int, list[GroupInRatingDto]] = dict()
for _, pairs in groupby(sorted(rating, key=key), key):
pairs = list(pairs)
group, _ = pairs[0]
earned = sum(1 for group, var in pairs if var is not None and var < config.get(group.title, 40))
places.setdefault(earned, [])
places[earned].append(GroupInRatingDto(group, earned))
return dict(sorted(places.items(), reverse=True))
def get_rating(self) -> dict[int, list[StudentInRatingDto]]:
def key(info: tuple[Group, TaskStatus]):
_, status = info
return status.group, status.variant
achievements = self.__read_achievements()
statuses = self.statuses.get_rating()
tasks = self.tasks.get_all()
places: dict[int, list[StudentInRatingDto]] = dict()
for _, pairs in groupby(sorted(statuses, key=key), key):
pairs = list(pairs)
group, status = pairs[0]
tids = [status.task for _, status in pairs]
if any(task.id not in tids for task in tasks):
continue
active = sum(len(status.achievements or [0]) for _, status in pairs if str(status.task) in achievements)
inactive = sum(1 for _, status in pairs if str(status.task) not in achievements)
earned = active + inactive
places.setdefault(earned, [])
places[earned].append(StudentInRatingDto(group, status.variant, earned))
ordered = sorted(places.items(), reverse=True)
return dict(ordered[0:self.config.config.places_in_rating])
def get_variant_statuses(self, gid: int, vid: int) -> VariantDto:
config = self.config.config
group = self.groups.get_by_id(gid)
variant = self.variants.get_by_id(vid)
statuses = self.__get_statuses(group.id)
e = self.__get_external_task_manager(group)
tasks = self.__get_tasks(group, config, e.random_active)
dto = self.__get_variant(group, variant, tasks, statuses, config, e)
return dto
def get_task_status(self, gid: int, vid: int, tid: int) -> TaskStatusDto:
status = self.statuses.get_task_status(tid, vid, gid)
achievements = self.__get_task_achievements(tid)
return self.__get_task_status_dto(gid, vid, tid, status, achievements)
def get_group_statuses(self, group_id: int) -> GroupDto:
config = self.config.config
group = self.groups.get_by_id(group_id)
variants = self.variants.get_all()
statuses = self.__get_statuses(group.id)
e = self.__get_external_task_manager(group)
tasks = self.__get_tasks(group, config, e.random_active)
dtos: list[VariantDto] = [] # A2
for var in variants: # A2
dto = self.__get_variant(group, var, tasks, statuses, config, e) # A2
dtos.append(dto) # A2
return GroupDto(group, tasks, dtos)
def get_submissions_statuses_by_info(self, group: int, variant: int, task: int, skip: int, take: int):
registration = self.config.config.enable_registration
submissions = [] # A4
checks = self.checks.get_by_task(group, variant, task, skip, take, registration)
for check, message, student in checks: # A4
submissions.append(self.__get_submissions(check, message, student)) # A4
return submissions
def get_submissions_statuses(self, student: Student, skip: int, take: int) -> list[SubmissionDto]:
checks_and_messages: list[tuple[MessageCheck, Message]] = self.checks.get_by_student(student, skip, take)
submissions = [] # A1
for check, message in checks_and_messages: # A1
submissions.append(self.__get_submissions(check, message, None)) # A1
return submissions
def __get_tasks(
self,
group: Group,
config: AppConfig,
active: bool
) -> list[TaskDto]:
dtos: list[TaskDto] = [] # A3
tasks = self.tasks.get_all()
for task in tasks: # A3
dto = TaskDto(group, task, config, active) # A3
dtos.append(dto) # A3
return dtos
def __get_submissions(
self,
check: MessageCheck,
message: Message,
student: Student | None
):
return SubmissionDto(self.__get_task_status_dto(message.group, message.variant, message.task, TaskStatus(
task=message.task,
variant=message.variant,
group=message.group,
time=check.time,
code=message.code,
ip=message.ip,
output=check.output,
status=check.status,
achievements=[]
), []), message.code, check.time, message.time, student)
def __get_task_status_dto(
self,
gid: int, vid: int, tid: int,
status: TaskStatus | None,
achievements: list[int]
) -> TaskStatusDto:
config = self.config.config
group = self.groups.get_by_id(gid)
variant = self.variants.get_by_id(vid)
task = self.tasks.get_by_id(tid)
e = self.__get_external_task_manager(group)
ext = e.get_external_task(task.id, variant.id)
task_dto = TaskDto(group, task, config, e.random_active)
return TaskStatusDto(group, variant, task_dto, status, ext, config, achievements)
def __get_task_achievements(self, task: int) -> list[int]:
achievements = self.__read_achievements()
stid = str(task)
achievements = achievements[stid] if stid in achievements else []
return achievements
def __read_achievements(self) -> dict[str, list[int]]:
if self.achievements is not None:
return self.achievements
analytics = self.config.config.analytics_path
spec = os.path.join(analytics, 'specification.json')
if not os.path.exists(spec):
self.achievements = dict()
return self.achievements
with open(spec, 'r') as file:
content = file.read()
spec = json.loads(content)
self.achievements = spec
return self.achievements
def __get_external_task_manager(self, group: Group) -> ExternalTaskManager:
seed = self.seeds.get_final_seed(group.id)
return ExternalTaskManager(
group=group,
seed=seed,
tasks=self.tasks,
groups=self.groups,
variants=self.variants,
config=self.config.config,
)
def __get_variant(
self,
group: Group,
variant: Variant,
tasks: list[TaskDto],
statuses: dict[tuple[int, int], TaskStatus],
config: AppConfig,
external: ExternalTaskManager,
) -> VariantDto:
dtos: list[TaskStatusDto] = [] # A5
for task in tasks: # A5
composite_key: tuple[int, int] = (variant.id, task.id) # A5
status = statuses.get(composite_key) # A5
e = external.get_external_task(task.id, variant.id) # A5
achievements = self.__get_task_achievements(task.id) # A5
dto = TaskStatusDto(group, variant, task, status, e, config, achievements) # A5
dtos.append(dto) # A5
return VariantDto(variant, dtos)
def __get_statuses(self, group: int) -> dict[tuple[int, int], TaskStatus]:
statuses = self.statuses.get_by_group(group=group)
dictionary = dict()
for status in statuses:
composite_key: tuple[int, int] = (status.variant, status.task)
dictionary[composite_key] = status
return dictionary
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment