Код: Выделить всё
import copy
import itertools
import math
import numpy as np
import pytest
import random
import sympy
import textdistance
import time
import typing
eps = 1e-6
class Dict(dict):
def __init__(self, d: dict = {}):
super().__init__(d)
self.removedItems = {}
self.addedItems = {}
def __sub__(self, d):
newDict = copy.deepcopy(self)
for k, v in self.items():
if (not k in d.keys()):
newDict.removedItems[k] = v
else:
del newDict[k]
for k, v in d.items():
if (not k in self.keys()):
newDict.addedItems[k] = v
return newDict
def __add__(self, d):
newDict = copy.deepcopy(self)
for k, v in d.removedItems.items():
del newDict[k]
for k, v in d.addedItems.items():
newDict[k] = v
return newDict
def index(l: list, value):
bestCost = math.inf
bestIndex = -1
for i in range(0, len(l)):
cost = heuristic(l[i], value)
if (cost < bestCost):
bestCost = cost
bestIndex = i
return bestIndex
def fit(target: object, function, parameters: list[list], limit: int = 1e5, percentage: float = 0.001) -> list:
initialCells = []
barycenter = []
for p in parameters:
barycenter.append(p[len(p) // 2])
initialCells.append(barycenter)
"""
count = 1
for p in parameters:
count *= len(p)
while (len(initialCells) < percentage * count):
point = []
for p in parameters:
point.append(random.choice(p))
initialCells.append(point)
"""
visitedCells = []
cells = []
for c in initialCells:
directions = []
for i in range(0, len(c)):
direction = []
for v in c:
if (isinstance(v, np.ndarray)):
x = copy.deepcopy(v)
x.fill(0)
else:
x = type(v)()
direction.append(x)
j = index(parameters[i], c[i])
direction[i] = parameters[i][j - 1] - c[i]
directions.append(direction)
direction = copy.deepcopy(direction)
direction[i] = parameters[i][(j + 1) % len(parameters[i])] - c[i]
directions.append(direction)
try:
value = function(c)
cost = heuristic(value, target)
cells.append((cost, c, directions))
except:
pass
cells = sorted(cells, key = lambda x: x[0])
if (len(cells) == 0):
return None
bestCost = cells[0][0]
bestParams = cells[0][1]
while (len(cells)):
cell = cells[0]
del cells[0]
if (index(visitedCells, cell) != -1):
continue
visitedCells.append(cell)
if (len(visitedCells) > limit):
break
for direction in cell[2]:
c = copy.deepcopy(cell[1])
for i in range(0, len(c)):
c[i] += direction[i]
d = []
for i in range(0, len(c)):
direction = []
for v in c:
if (isinstance(v, np.ndarray)):
x = copy.deepcopy(v)
x.fill(0)
else:
x = type(v)()
direction.append(x)
j = index(parameters[i], c[i])
direction[i] = parameters[i][j - 1] - c[i]
d.append(direction)
direction = copy.deepcopy(direction)
direction[i] = parameters[i][(j + 1) % len(parameters[i])] - c[i]
d.append(direction)
try:
value = function(c)
cost = heuristic(value, target)
if (cost < bestCost):
bestCost = cost
bestParams = c
if (cost < eps):
return bestParams
cells.append((cost, c, d))
except:
pass
cells = sorted(cells, key = lambda x: x[0])
cells = cells[:50]
return bestParams
def heuristic(val, target):
if (isinstance(target, str)):
s = val
cost = 0 if isinstance(val, str) else 1
try:
s = str(val)
except:
pass
if (isinstance(s, str)):
a, b = s, target
if (b in a):
a, b = b, a
if (a in b):
return cost + 1 - 1 / target.count(a) + 1 / (1 + len(a)) - 1 / (1 + len(b))
return cost + 1 / (1 + len(a)) - 1 / (1 + len(b)) + textdistance.Levenshtein().distance(a, b)
else:
return abs(hash(val) - hash(target))
elif (isinstance(target, sympy.Expr)):
if (isinstance(val, sympy.Expr)):
if (val == target):
return 0
else:
return heuristic(str(val), str(target))
else:
return 1 + heuristic(str(val), str(target))
try:
v = np.array(val)
t = np.array(target)
if (v.shape != t.shape):
return 100 + abs(np.sum(v) - np.sum(t))
return np.linalg.norm(np.subtract(v, t))
except:
try:
return 999.0
#return abs(hash(val) - hash(target))
except:
return 999.0
def toTuple(x) -> tuple:
if (isinstance(x, dict)):
return tuple(map(tuple, x.items()))
if (isinstance(x, np.ndarray) or isinstance(x, list) or isinstance(x, set)):
l = []
for v in x:
l.append(normalizedOutput(v))
return tuple(l)
return tuple(x)
def normalizedOutput(output):
if (isinstance(output, list) or isinstance(output, np.ndarray) or isinstance(output, set) or isinstance(output, dict)):
return toTuple(output)
return output
class Neuron:
def __init__(self, name, function, inputTypes: tuple[type], outputType: type):
self.name = name
self.function = function
self.inputTypes = inputTypes
self.outputType = outputType
def __eq__(self, other):
if (isinstance(other, Neuron)):
return self.name == other.name and self.function == other.function and self.inputTypes == other.inputTypes and self.outputType == other.outputType
return False
def __hash__(self):
return hash(self.function)
class Connection:
def __init__(self, neuron: Neuron, inputs: list):
self.neuron = neuron
self.inputs = list(inputs)
for i in range(0, len(self.inputs)):
if (isinstance(self.inputs[i], Connection)):
self.inputs[i] = copy.copy(self.inputs[i])
def __eq__(self, other):
if (isinstance(other, Connection)):
return self.neuron == other.neuron and self.inputs == other.inputs
return False
def __hash__(self):
return hash(self.neuron) + int(np.sum([hash(x) for x in self.inputs]))
def applyInputs(self, inputs: list):
index = 0
for i in range(0, len(self.inputs)):
input_ = self.inputs[i]
if (isinstance(input_, Connection)):
size = len(input_.inputTypes())
input_.applyInputs(inputs[index:index + size])
index += size
else:
if (index < len(inputs)):
if (isinstance(inputs[index], Connection)):
assert(inputs[index].neuron.outputType == self.neuron.inputTypes[i])
elif (isinstance(inputs[index], Neuron)):
assert(inputs[index].outputType == self.neuron.inputTypes[i])
elif (isinstance(inputs[index], type)):
assert(inputs[index] == self.neuron.inputTypes[i])
else:
assert(type(inputs[index]) == self.neuron.inputTypes[i])
if (isinstance(inputs[index], Connection)):
self.inputs[i] = copy.copy(inputs[index])
else:
self.inputs[i] = inputs[index]
index += 1
def output(self, inputs: list = None):
args = []
index = 0
for i in range(0, len(self.inputs)):
input_ = self.inputs[i]
if (isinstance(input_, Connection)):
if (inputs == None):
args.append(input_.output())
else:
size = len(input_.inputTypes())
args.append(input_.output(inputs[index:index + size]))
index += size
else:
if (inputs != None):
if (index < len(inputs)):
args.append(inputs[index])
index += 1
elif (isinstance(input_, Neuron)):
args.append(input_.function())
else:
args.append(input_)
return self.neuron.function(*args)
def inputTypes(self) -> list:
types = []
for input_ in self.inputs:
if (isinstance(input_, Connection)):
types.extend(input_.inputTypes())
elif (isinstance(input_, Neuron)):
types.extend(input_.inputTypes)
elif (isinstance(input_, typing.GenericAlias)):
types.append(input_)
elif (isinstance(input_, type)):
types.append(input_)
else:
types.append(type(input_))
return types
def cost(self) -> int:
cost = 0
for i in self.inputs:
cost += 1
if (type(i) is Connection):
cost += i.cost()
return cost
def toStr(self) -> str:
args = []
for i in self.inputs:
if (type(i) is Connection):
args.append(i.toStr())
elif (type(i) is Neuron):
args.append(i.name)
else:
args.append(str(i))
s = self.neuron.name
if (len(self.neuron.inputTypes)):
s += "(" + ", ".join(args) + ")"
return s
class Brain:
def __init__(self, neurons: list[Neuron]):
self.neurons = neurons
def directLearn(self, targets: list = [], targetTypes: list = None, level: int = 3):
if (targetTypes == None):
targetTypes = []
for t in targets:
targetTypes.append(type(t))
assert(len(targets) == len(targetTypes))
parameters = dict()
types = dict()
for neuron in self.neurons:
if (len(neuron.inputTypes) == 0):
d = parameters.get(neuron.outputType, dict())
value = neuron.function()
types[neuron.outputType] = neuron.outputType
value = normalizedOutput(value)
l = d.get(value, [])
l.append(neuron)
d[value] = l
parameters[neuron.outputType] = d
learnedConnections = [None] * len(targets)
connectionMapping = {}
connections = set()
for neuron in self.neurons:
if (len(neuron.inputTypes)):
connections.add(Connection(neuron, neuron.inputTypes))
for ll in range(0, level):
mapping = copy.deepcopy(connectionMapping)
for connection in connections:
l = mapping.get(connection.neuron.outputType, set())
args = connection.neuron.inputTypes
args = [[x] for x in args]
for j in range(0, len(connection.neuron.inputTypes)):
args[j].extend(connectionMapping.get(connection.neuron.inputTypes[j], []))
if (len(args)):
product = itertools.product(*args)
for p in product:
c = copy.deepcopy(connection)
c.applyInputs(p)
l.add(c)
if (len(l)):
if (connection.neuron.outputType in targetTypes):
newL = set()
for c in l:
args = []
connectionInputTypes = c.inputTypes()
for t in connectionInputTypes:
l = list(parameters[t].keys())
for i in range(0, len(l)):
if (types[t] == np.ndarray):
l[i] = np.asarray(l[i])
else:
l[i] = types[t](l[i])
args.append(l)
tt = [x == connection.neuron.outputType for x in targetTypes]
evalConnections = []
for j in range(0, len(tt)):
if (tt[j]):
params = fit(targets[j], c.output, args)
if (params == None):
continue
inputs = []
for k in range(0, len(connectionInputTypes)):
inputs.append(parameters[connectionInputTypes[k]][normalizedOutput(params[k])][0])
try:
value = c.output(params)
evalConnections.append((heuristic(value, targets[j]), c, inputs))
for kk in range(0, len(inputs)):
assert(inputs[kk].outputType == connectionInputTypes[kk])
except:
pass
evalConnections = sorted(evalConnections, key = lambda x: (x[0], x[1].cost()))
if (len(evalConnections)):
h = math.inf
if (learnedConnections[j] != None):
s = learnedConnections[j].toStr()
h = heuristic(learnedConnections[j].output(), targets[j])
c = copy.deepcopy(evalConnections[0][1])
c.applyInputs(evalConnections[0][2])
if (h > heuristic(c.output(), targets[j])):
learnedConnections[j] = c
for x in evalConnections[:100]:
newL.add(x[1])
l = newL
mapping[connection.neuron.outputType] = l
h = [math.inf] * len(learnedConnections)
for j in range(0, len(learnedConnections)):
if (learnedConnections[j] != None):
h[j] = heuristic(targets[j], learnedConnections[j].output())
if (np.sum(h) < eps):
ll = level
break
connectionMapping = mapping
connections = []
for k, v in connectionMapping.items():
connections += v
connections = set(connections)
return learnedConnections
def taskPairs(task: str) -> tuple:
import json
import urllib.request
url = urllib.request.urlopen("https://raw.githubusercontent.com/arcprize/ARC-AGI-2/refs/heads/main/data/training/" + task + ".json")
data = json.loads(url.read().decode())
train = data["train"]
trainPairs = []
for v in train:
trainPairs.append((np.array(v["input"]), np.array(v["output"])))
test = data["test"]
testPairs = []
for v in test:
testPairs.append((np.array(v["input"]), np.array(v["output"])))
return (trainPairs, testPairs)
def inputOutputPair(pairs):
inputs = []
outputs = []
for p in pairs:
inputs.append(p[0])
outputs.append(p[1])
return (inputs, outputs)
def map_(x: np.ndarray, mapping: Dict):
y = copy.deepcopy(x)
for i in range(0, y.shape[0]):
for j in range(0, y.shape[1]):
y[i, j] = mapping.get(y[i, j], y[i, j])
return y
def associate(mapping: Dict, a, b):
m = copy.deepcopy(mapping)
m[a] = b
return m
def test_task0d3d703e(): #Color mapping
digitNeurons = []
for i in range(0, 10):
digitNeurons.append(Neuron(str(i), lambda i = i: i, [], int))
mapNeuron = Neuron("map", lambda x, m: [map_(v, m) for v in x], [np.ndarray, Dict], np.ndarray)
mappingNeuron = Neuron("mapping", lambda: Dict(), [], Dict)
associateNeuron = Neuron("associate", lambda mapping, a, b: associate(mapping, a, b), [Dict, int, int], Dict)
inputNeuron = Neuron("input", lambda: np.array([]), [], np.ndarray)
neurons = []
neurons += digitNeurons
neurons.append(mapNeuron)
neurons.append(mappingNeuron)
neurons.append(associateNeuron)
neurons.append(inputNeuron)
brain = Brain(neurons)
trainPairs, testPairs = taskPairs("0d3d703e")
input_, output = inputOutputPair(trainPairs)
input_ = np.array(input_)
output = np.array(output)
inputNeuron.function = lambda input_ = input_: input_
connections = brain.directLearn([output], level = 3)#5)
connection = connections[0]
print(connection.toStr())
input_, output = inputOutputPair(testPairs)
input_ = np.array(input_)
output = np.array(output)
inputNeuron.function = lambda input_ = input_: input_
assert(not heuristic(connection.output(), output))
Код: Выделить всё
__________________________________________________ test_task0d3d703e __________________________________________________
def test_task0d3d703e(): #Color mapping
digitNeurons = []
for i in range(0, 10):
digitNeurons.append(Neuron(str(i), lambda i = i: i, [], int))
mapNeuron = Neuron("map", lambda x, m: [map_(v, m) for v in x], [np.ndarray, Dict], np.ndarray)
mappingNeuron = Neuron("mapping", lambda: Dict(), [], Dict)
associateNeuron = Neuron("associate", lambda mapping, a, b: associate(mapping, a, b), [Dict, int, int], Dict)
inputNeuron = Neuron("input", lambda: np.array([]), [], np.ndarray)
neurons = []
neurons += digitNeurons
neurons.append(mapNeuron)
neurons.append(mappingNeuron)
neurons.append(associateNeuron)
neurons.append(inputNeuron)
brain = Brain(neurons)
trainPairs, testPairs = taskPairs("0d3d703e")
input_, output = inputOutputPair(trainPairs)
input_ = np.array(input_)
output = np.array(output)
inputNeuron.function = lambda input_ = input_: input_
> connections = brain.directLearn([output], level = 3)#5)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
mcve.py:625:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
mcve.py:517: in directLearn
c.applyInputs(evalConnections[0][2])
mcve.py:312: in applyInputs
input_.applyInputs(inputs[index:index + size])
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self =
inputs = [, ]
def applyInputs(self, inputs: list):
index = 0
for i in range(0, len(self.inputs)):
input_ = self.inputs[i]
if (isinstance(input_, Connection)):
size = len(input_.inputTypes())
input_.applyInputs(inputs[index:index + size])
index += size
else:
if (index < len(inputs)):
if (isinstance(inputs[index], Connection)):
assert(inputs[index].neuron.outputType == self.neuron.inputTypes[i])
elif (isinstance(inputs[index], Neuron)):
> assert(inputs[index].outputType == self.neuron.inputTypes[i])
E AssertionError: assert ==
E + where = .outputType
mcve.py:319: AssertionError

Подробнее здесь: https://stackoverflow.com/questions/798 ... ction-tree
Мобильная версия