remote recommendation_systems

This commit is contained in:
l00475793
2020-12-01 22:09:33 +08:00
committed by wangtq
parent 7dc1caa55c
commit 1dbddcc7cc
12 changed files with 0 additions and 101127 deletions

View File

@ -42,8 +42,6 @@ cp -r * YOUR_MADLIB_SOURCE_CODE/src/ports/postgres/modules
THEN, add following to `src/config/Modules.yml` to register those modules.
```
- name: recommendation_systems
depends: ['utilities']
- name: agglomerative_clustering
depends: ['utilities']
- name: xgboost_gs

View File

@ -1,261 +0,0 @@
# -*- coding:utf-8 -*-
# Copyright (c) 2020 Huawei Technologies Co.,Ltd.
#
# openGauss is licensed under Mulan PSL v2.
# You can use this software according to the terms
# and conditions of the Mulan PSL v2.
# You may obtain a copy of Mulan PSL v2 at:
#
# http://license.coscl.org.cn/MulanPSL2
#
# THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS,
# WITHOUT WARRANTIES OF ANY KIND,
# EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
# MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
# See the Mulan PSL v2 for more details.
# ----------------------------------------------------------------------------
from collections import defaultdict
from operator import itemgetter
import math
import random
import plpy
import json
from utilities.validate_args import quote_ident
from utilities.validate_args import table_exists
def train_validate(train_table, user_varname, item_varname,
rating_varname, k_sim_item, use_iuf_similarity,
test_table, n_rec_item):
if not train_table or not user_varname or not item_varname or not rating_varname or not k_sim_item or k_sim_item < 1:
plpy.error("The input parameters are invalid.")
if not use_iuf_similarity:
use_iuf_similarity = False
if not test_table:
test_table = None
if not n_rec_item and n_rec_item != 0:
n_rec_item = 10
elif n_rec_item <= 0:
plpy.error("The input parameters are invalid.")
return use_iuf_similarity, test_table, n_rec_item
def train(schema_madlib, train_table, user_varname, item_varname,
rating_varname, k_sim_item, use_iuf_similarity,
test_table, n_rec_item, **kwargs):
# 0) Validate input parameter
use_iuf_similarity, test_table, n_rec_item=train_validate(train_table,
user_varname, item_varname, rating_varname, k_sim_item,
use_iuf_similarity, test_table, n_rec_item)
# 1) Generate user-item-rating matrix
sql = """select {user_varname}, {item_varname}, {rating_varname} from {train_table};""".format(
user_varname=quote_ident(user_varname),
item_varname=quote_ident(item_varname),
rating_varname=quote_ident(rating_varname),
train_table=train_table
)
results = plpy.execute(sql)
user_item_rating_mat = defaultdict(dict)
for result in results:
user_item_rating_mat[result[user_varname]][result[item_varname]] = result[rating_varname]
# 2) Generate item-item-similarity matrix
# 2.1) Set item_item_sim_table name
if use_iuf_similarity:
item_item_sim_table = train_table+'_itemcf_iuf_sim'
else:
item_item_sim_table = train_table+'_itemcf_sim'
# 2.2) First time generate item_item_sim_table
if not table_exists(item_item_sim_table):
# 2.2.1) Get item popular
item_popular = defaultdict(int)
for user, items in user_item_rating_mat.items():
for item in items:
item_popular[item] += 1
# 2.2.2) Make item_item_sim_mat
item_item_sim_mat_val = {}
item_item_sim_mat_sum = {}
for user, items in user_item_rating_mat.items():
for itemi in items:
item_item_sim_mat_val.setdefault(itemi, defaultdict(float))
item_item_sim_mat_sum.setdefault(itemi, defaultdict(float))
len_itemi = item_popular[itemi]
for itemj in items:
len_itemj = item_popular[itemj]
if itemi == itemj:
continue
if use_iuf_similarity:
item_item_sim_mat_val[itemi][itemj] += 1 / math.log(1 + len(items))
item_item_sim_mat_sum[itemi][itemj] = item_item_sim_mat_val[itemi][itemj]/math.sqrt(len_itemi*len_itemj)
else:
item_item_sim_mat_val[itemi][itemj] += 1
item_item_sim_mat_sum[itemi][itemj] = item_item_sim_mat_val[itemi][itemj]/math.sqrt(len_itemi*len_itemj)
# 2.2.3) store item_item_sim_mat
sql = """drop table if exists {item_item_sim_table};
create table {item_item_sim_table} (itemid integer, sim_item json);""".format(
item_item_sim_table=item_item_sim_table
)
plpy.execute(sql)
for itemid, items in item_item_sim_mat_sum.items():
items_json = json.dumps(items)
sql = """insert into {item_item_sim_table} values ({itemid}, $${items_json}$$);""".format(
item_item_sim_table=item_item_sim_table,
itemid=itemid,
items_json=items_json
)
plpy.execute(sql)
# 2.3) Read item_item_sim_table
sql = """select * from {item_item_sim_table};""".format(
item_item_sim_table=item_item_sim_table,
)
results = plpy.execute(sql)
item_item_sim_mat = {}
for result in results:
itemi = result['itemid']
item_item_sim_mat.setdefault(itemi, defaultdict(float))
items_json = json.loads(result['sim_item'])
for itemj, itemi_itemj_sim in items_json.items():
item_item_sim_mat[itemi][int(itemj)] = itemi_itemj_sim
# 3) Generate user-item-score matrix
# 3.1) Make user_item_score table
user_item_score_mat = {}
for user, items in user_item_rating_mat.items():
user_item_score_mat.setdefault(user, defaultdict(float))
for itemi, rating in items.items():
for itemj, similarity in sorted(item_item_sim_mat[itemi].items(), key=itemgetter(1), reverse=True)[:k_sim_item]:
if itemj in user_item_rating_mat[user]:
continue
user_item_score_mat[user][itemj] += similarity * rating
# 3.2) Store user_item_score_mat
user_item_score_table = train_table + '_itemcf_score'
sql = """drop table if exists {user_item_score_table};
create table {user_item_score_table} (userid integer, scored_item json);""".format(
user_item_score_table=user_item_score_table
)
plpy.execute(sql)
for userid, items in user_item_score_mat.items():
items_json = json.dumps(items)
sql = """insert into {user_item_score_table} values ({userid}, $${items_json}$$);""".format(
user_item_score_table=user_item_score_table,
userid=userid,
items_json=items_json
)
plpy.execute(sql)
plpy.info('Training finish! Users\' scored items are stored in table ' + user_item_score_table)
# 4) Test recommendation results
if test_table:
# 4.1) Do statistic
item_set = set()
item_popular = defaultdict(int)
for _, items in user_item_rating_mat.items():
for item in items:
item_set.add(item)
item_popular[item] += 1
item_count = len(item_set)
# 4.2) Read true data
sql = """select {user_varname}, {item_varname} from {test_table};""".format(
user_varname=quote_ident(user_varname),
item_varname=quote_ident(item_varname),
test_table=test_table
)
results = plpy.execute(sql)
user_item_true_mat = defaultdict(set)
for result in results:
user_item_true_mat[result[user_varname]].add(result[item_varname])
# 4.3) Test
hit_count = 0
rec_count = 0
test_count = 0
all_rec_items = set()
popular_sum = 0
for user in user_item_rating_mat:
if user not in user_item_true_mat:
continue
true_items = user_item_true_mat[user]
scored_items = user_item_score_mat[user]
for item, _ in sorted(scored_items.items(), key=itemgetter(1), reverse=True)[:n_rec_item]:
if item in true_items:
hit_count += 1
all_rec_items.add(item)
popular_sum += math.log(1 + item_popular[item])
rec_count += n_rec_item
test_count += len(true_items)
precision = hit_count / (1.0 * rec_count)
recall = hit_count / (1.0 * test_count)
coverage = len(all_rec_items) / (1.0 * item_count)
popularity = popular_sum / (1.0 * rec_count)
plpy.info('Testing finish! Precision:', precision, 'Recall:', recall, 'Coverage:', coverage, 'Popularity:', popularity)
def predict_validate(train_table, predict_table, user_varname, recommend_table, n_rec_item):
if not train_table or not predict_table or not user_varname or not recommend_table or not n_rec_item or n_rec_item < 1:
plpy.error("The input parameters are invalid.")
def predict(schema_madlib, train_table, predict_table, user_varname, recommend_table, n_rec_item, **kwargs):
# 0) Validate input parameters
predict_validate(train_table, predict_table, user_varname, recommend_table, n_rec_item)
# 1) Read user-item-score table
sql = """select userid, scored_item from {user_item_score_table};""".format(
user_item_score_table=train_table+'_itemcf_score'
)
results = plpy.execute(sql)
user_item_score_mat = {}
for result in results:
userid = result['userid']
user_item_score_mat.setdefault(userid, [])
scored_items = json.loads(result['scored_item'])
for itemid, _ in sorted(scored_items.items(), key=itemgetter(1), reverse=True)[:n_rec_item]:
user_item_score_mat[userid].append(itemid)
# 2) Read predict table
sql = """select {user_varname} from {predict_table};""".format(
user_varname=quote_ident(user_varname),
predict_table=predict_table
)
results = plpy.execute(sql)
users = set()
for result in results:
users.add(result[user_varname])
# 3) Generate recommend table
sql = """drop table if exists {recommend_table};
create table {recommend_table} (userid integer, recommend_item integer[]);""".format(
recommend_table=recommend_table
)
plpy.execute(sql)
for user in users:
if user not in user_item_score_mat:
plpy.info("Userid", user, "is a cold-start user!")
continue
recommend_item = [int(x) for x in user_item_score_mat[user]]
sql = """insert into {recommend_table} values ({userid}, array{recommend_item});""".format(
recommend_table=recommend_table,
userid=user,
recommend_item=recommend_item
)
plpy.execute(sql)
plpy.info('Recommending finish! Users\' recommended items are stored in table ' + recommend_table)

View File

@ -1,92 +0,0 @@
/*
* Portions Copyright (c) 2020 Huawei Technologies Co.,Ltd.
*
* openGauss is licensed under Mulan PSL v2.
* You can use this software according to the terms and conditions of the Mulan PSL v2.
* You may obtain a copy of Mulan PSL v2 at:
*
* http://license.coscl.org.cn/MulanPSL2
*
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PSL v2 for more details.
* -------------------------------------------------------------------------
*/
-------------------------------------
-- Build item-based CF in database --
-------------------------------------
---------------------------------------------------------------------------------------------------
-- Note: This module allows you to use SQL to call item-based collaborative filtering algorithm. --
---------------------------------------------------------------------------------------------------
m4_include(`SQLCommon.m4')
-----------------
-- train & test--
-----------------
----------------------------------------------------------------------------------------------------------
-- train & test -- intermediate output table -------------------------------------------------------------
-- 1) train_table + '_itemcf_sim': Table that stores all items and their similar items -------------------
-- 2) train_table + '_itemcf_iuf_sim': Table that stores all items and their similar items by using iuf --
-- 3) train_table + '_itemcf_score': Table that stores all users and their scored items ------------------
----------------------------------------------------------------------------------------------------------
CREATE OR REPLACE FUNCTION MADLIB_SCHEMA.itemcf_train(
train_table TEXT, -- Name of the table containing the train data
user_varname TEXT, -- Name of the user column from the train table
item_varname TEXT, -- Name of the item column from the train table
rating_varname TEXT, -- Name of the rating column from the train table
k_sim_item INTEGER, -- Parameter for selecting the top-k similar items
use_iuf_similarity BOOL, -- (optional, default:False) Determine whether to use the improved algorithm version
test_table TEXT, -- (optional, default:None) Name of the table containing the test data
n_rec_item INTEGER -- (optional, default:10) Parameter for selecting the top-n recommended items
) RETURNS VOID AS $$
PythonFunction(recommendation_systems, item_based_cf, train)
$$ LANGUAGE plpythonu;
CREATE OR REPLACE FUNCTION MADLIB_SCHEMA.itemcf_train(
train_table TEXT,
user_varname TEXT,
item_varname TEXT,
rating_varname TEXT,
k_sim_item INTEGER,
use_iuf_similarity BOOL,
test_table TEXT
) RETURNS VOID AS $$
SELECT MADLIB_SCHEMA.itemcf_train($1, $2, $3, $4, $5, $6, $7, NULL);
$$ LANGUAGE sql;
CREATE OR REPLACE FUNCTION MADLIB_SCHEMA.itemcf_train(
train_table TEXT,
user_varname TEXT,
item_varname TEXT,
rating_varname TEXT,
k_sim_item INTEGER,
use_iuf_similarity BOOL
) RETURNS VOID AS $$
SELECT MADLIB_SCHEMA.itemcf_train($1, $2, $3, $4, $5, $6, NULL, NULL);
$$ LANGUAGE sql;
CREATE OR REPLACE FUNCTION MADLIB_SCHEMA.itemcf_train(
train_table TEXT,
user_varname TEXT,
item_varname TEXT,
rating_varname TEXT,
k_sim_item INTEGER
) RETURNS VOID AS $$
SELECT MADLIB_SCHEMA.itemcf_train($1, $2, $3, $4, $5, FALSE, NULL, NULL);
$$ LANGUAGE sql;
-------------
-- predict --
-------------
CREATE OR REPLACE FUNCTION MADLIB_SCHEMA.itemcf_predict(
train_table TEXT, -- Name of the table containing the train data
predict_table TEXT, -- Name of the table containing the predict data
user_varname TEXT, -- Name of user column from the train table
recommend_table TEXT, -- Name of the output table containing the recommended items
n_rec_item INTEGER -- Parameter for selecting the top-n recommended items
) RETURNS VOID AS $$
PythonFunction(recommendation_systems, item_based_cf, predict)
$$ LANGUAGE plpythonu;

View File

@ -1,232 +0,0 @@
# -*- coding:utf-8 -*-
# Copyright (c) 2020 Huawei Technologies Co.,Ltd.
#
# openGauss is licensed under Mulan PSL v2.
# You can use this software according to the terms
# and conditions of the Mulan PSL v2.
# You may obtain a copy of Mulan PSL v2 at:
#
# http://license.coscl.org.cn/MulanPSL2
#
# THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS,
# WITHOUT WARRANTIES OF ANY KIND,
# EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
# MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
# See the Mulan PSL v2 for more details.
# ----------------------------------------------------------------------------
from collections import defaultdict
from operator import itemgetter
import math
import random
import plpy
import json
from utilities.validate_args import quote_ident
def gen_negative_sample(items, items_list):
samples = dict()
for item, rate in items.items():
samples[item] = 1
for i in range(len(items) * 11):
item = items_list[random.randint(0, len(items_list) - 1)]
if item in samples:
continue
samples[item] = 0
if len(samples) >= 10 * len(items):
break
return samples
def train_validate(train_table, user_varname, item_varname,
rating_varname, k_factor, epochs, alpha, lamb, test_table, n_rec_item):
if (not train_table or not user_varname or not item_varname or not rating_varname or not k_factor or k_factor < 1
or not epochs or epochs < 1 or not alpha or alpha < 0 or not lamb or lamb < 0):
plpy.error("The input parameters are invalid.")
if not test_table:
test_table = None
if not n_rec_item and n_rec_item != 0:
n_rec_item = 10
elif n_rec_item <= 0:
plpy.error("The input parameters are invalid.")
return test_table, n_rec_item
def train(schema_madlib, train_table, user_varname, item_varname,
rating_varname, k_factor, epochs, alpha, lamb,
test_table, n_rec_item, **kwargs):
# 0) Validate input parameters
test_table, n_rec_item=train_validate(train_table, user_varname, item_varname,
rating_varname, k_factor, epochs, alpha, lamb, test_table, n_rec_item)
# 1) Generate user-item-rating matrix
sql = """select {user_varname}, {item_varname}, {rating_varname} from {train_table};""".format(
user_varname=quote_ident(user_varname),
item_varname=quote_ident(item_varname),
rating_varname=quote_ident(rating_varname),
train_table=train_table
)
results = plpy.execute(sql)
user_item_rating_mat = defaultdict(dict)
for result in results:
user_item_rating_mat[result[user_varname]][result[item_varname]] = result[rating_varname]
# 2) Init and train
# 2.1) Init user item set
users_set, items_set = set(), set()
items_list = []
item_popular = defaultdict(int)
for user, items in user_item_rating_mat.items():
for item in items:
item_popular[item] += 1
users_set.add(user)
items_set.add(item)
items_list.append(item)
items_count = len(items_set)
# 2.2) Init model
P = dict()
Q = dict()
for user in users_set:
P[user] = [random.random()/math.sqrt(k_factor) for _ in range(k_factor)]
for item in items_set:
Q[item] = [random.random()/math.sqrt(k_factor) for _ in range(k_factor)]
# 2.3) Train model
for epoch in range(epochs):
plpy.info('Training epoch:', epoch)
for user in user_item_rating_mat:
samples = gen_negative_sample(user_item_rating_mat[user], items_list)
for item, rui in samples.items():
rate_e = 0
for k in range(k_factor):
Puk = P[user][k]
Qki = Q[item][k]
rate_e += Puk * Qki
eui = rui - rate_e
for k in range(k_factor):
P[user][k] += alpha * (eui * Q[item][k] - lamb * P[user][k])
Q[item][k] += alpha * (eui * P[user][k] - lamb * Q[item][k])
alpha *= 0.9
plpy.info(P[1],Q[1])
# 5) Generate and store user-item-score
user_item_score_mat = {}
for user in users_set:
user_item_score_mat.setdefault(user, defaultdict(float))
interacted_items = user_item_rating_mat[user]
for item in items_set:
if item in interacted_items.keys():
continue
for k, Qik in enumerate(Q[item]):
user_item_score_mat[user][item] += P[user][k] * Qik
user_item_score_table = train_table + '_LFM_score'
sql = """drop table if exists {user_item_score_table};
create table {user_item_score_table} (userid integer, scored_item json);""".format(
user_item_score_table=user_item_score_table
)
plpy.execute(sql)
for userid, items in user_item_score_mat.items():
items_json = json.dumps(items)
sql = """insert into {user_item_score_table} values ({userid}, $${items_json}$$);""".format(
user_item_score_table=user_item_score_table,
userid=userid,
items_json=items_json
)
plpy.execute(sql)
plpy.info('Training finish! Users\' scored items are stored in table ' + user_item_score_table)
# 6) Test recommendation results
if test_table:
sql = """select {user_varname}, {item_varname} from {test_table};""".format(
user_varname=quote_ident(user_varname),
item_varname=quote_ident(item_varname),
test_table=test_table
)
results = plpy.execute(sql)
user_item_true_mat = defaultdict(set)
for result in results:
user_item_true_mat[result[user_varname]].add(result[item_varname])
hit_count = 0
rec_count = 0
test_count = 0
all_rec_items = set()
popular_sum = 0
for user in users_set:
if user not in user_item_true_mat:
continue
true_items = user_item_true_mat[user]
scored_items = user_item_score_mat[user]
for item, _ in sorted(scored_items.items(), key=itemgetter(1), reverse=True)[:n_rec_item]:
if item in true_items:
hit_count += 1
all_rec_items.add(item)
popular_sum += math.log(1 + item_popular[item])
rec_count += n_rec_item
test_count += len(true_items)
precision = hit_count / (1.0 * rec_count)
recall = hit_count / (1.0 * test_count)
coverage = len(all_rec_items) / (1.0 * items_count)
popularity = popular_sum / (1.0 * rec_count)
plpy.info('Testing finish! Precision:', precision, 'Recall:', recall, 'Coverage:', coverage, 'Popularity:', popularity)
def predict_validate(train_table, predict_table, user_varname, recommend_table, n_rec_item):
if not train_table or not predict_table or not user_varname or not recommend_table or not n_rec_item or n_rec_item < 1:
plpy.error("The input parameters are invalid.")
def predict(schema_madlib, train_table, predict_table, user_varname, recommend_table, n_rec_item, **kwargs):
# 0) Validate input parameters
predict_validate(train_table, predict_table, user_varname, recommend_table, n_rec_item)
# 1) Read user-item-score table
sql = """select userid, scored_item from {user_item_score_table};""".format(
user_item_score_table=train_table+'_LFM_score'
)
results = plpy.execute(sql)
user_item_score_mat = {}
for result in results:
userid = result['userid']
user_item_score_mat.setdefault(userid, [])
scored_items = json.loads(result['scored_item'])
for itemid, _ in sorted(scored_items.items(), key=itemgetter(1), reverse=True)[:n_rec_item]:
user_item_score_mat[userid].append(itemid)
# 2) Read predict table
sql = """select {user_varname} from {predict_table};""".format(
user_varname=quote_ident(user_varname),
predict_table=predict_table
)
results = plpy.execute(sql)
users = set()
for result in results:
users.add(result[user_varname])
# 3) Generate recommend table
sql = """drop table if exists {recommend_table};
create table {recommend_table} (userid integer, recommend_item integer[]);""".format(
recommend_table=recommend_table
)
plpy.execute(sql)
for user in users:
if user not in user_item_score_mat:
plpy.info("Userid", user, "is a cold-start user!")
continue
recommend_item = [int(x) for x in user_item_score_mat[user]]
sql = """insert into {recommend_table} values ({userid}, array{recommend_item});""".format(
recommend_table=recommend_table,
userid=user,
recommend_item=recommend_item
)
plpy.execute(sql)
plpy.info('Recommending finish! Users\' recommended items are stored in table ' + recommend_table)

View File

@ -1,87 +0,0 @@
/*
* Portions Copyright (c) 2020 Huawei Technologies Co.,Ltd.
*
* openGauss is licensed under Mulan PSL v2.
* You can use this software according to the terms and conditions of the Mulan PSL v2.
* You may obtain a copy of Mulan PSL v2 at:
*
* http://license.coscl.org.cn/MulanPSL2
*
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PSL v2 for more details.
* -------------------------------------------------------------------------
*/
-------------------------------------------
-- Build latent factor model in database --
-------------------------------------------
------------------------------------------------------------------------------------
-- Note: This module allows you to use SQL to call latent factor model algorithm. --
------------------------------------------------------------------------------------
m4_include(`SQLCommon.m4')
-----------------
-- train & test--
-----------------
---------------------------------------------------------------------------------------
-- train & test -- intermediate output table ------------------------------------------
-- 1) train_table + '_LFM_score': Table that stores all users and their scored items --
---------------------------------------------------------------------------------------
CREATE OR REPLACE FUNCTION MADLIB_SCHEMA.LFM_train(
train_table TEXT, -- Name of the table containing the train data
user_varname TEXT, -- Name of the user column from the train table
item_varname TEXT, -- Name of the item column from the train table
rating_varname TEXT, -- Name of the rating column from the train table
k_factor INTEGER, -- Parameter for determining the number of latent factors
epochs INTEGER, -- Parameter for determining the number of training epochs
alpha double precision, -- Parameter for determining the learning rate
lamb double precision, -- Parameter for determining the regularization coefficient
test_table TEXT, -- (optional, default:None) Name of the table containing the test data
n_rec_item INTEGER -- (optional, default:10) Parameter for selecting the top-n recommended items
) RETURNS VOID AS $$
PythonFunction(recommendation_systems, latent_factor_model, train)
$$ LANGUAGE plpythonu;
CREATE OR REPLACE FUNCTION MADLIB_SCHEMA.LFM_train(
train_table TEXT,
user_varname TEXT,
item_varname TEXT,
rating_varname TEXT,
k_factor INTEGER,
epochs INTEGER,
alpha double precision,
lamb double precision,
test_table TEXT
) RETURNS VOID AS $$
SELECT MADLIB_SCHEMA.LFM_train($1, $2, $3, $4, $5, $6, $7, $8, $9, NULL);
$$ LANGUAGE sql;
CREATE OR REPLACE FUNCTION MADLIB_SCHEMA.LFM_train(
train_table TEXT,
user_varname TEXT,
item_varname TEXT,
rating_varname TEXT,
k_factor INTEGER,
epochs INTEGER,
alpha double precision,
lamb double precision
) RETURNS VOID AS $$
SELECT MADLIB_SCHEMA.LFM_train($1, $2, $3, $4, $5, $6, $7, $8, NULL, NULL);
$$ LANGUAGE sql;
-------------
-- predict --
-------------
CREATE OR REPLACE FUNCTION MADLIB_SCHEMA.LFM_predict(
train_table TEXT, -- Name of the table containing the train data
predict_table TEXT, -- Name of the table containing the predict data
user_varname TEXT, -- Name of user column from the train table
recommend_table TEXT, -- Name of the output table containing the recommended items
n_rec_item INTEGER -- Parameter for selecting the top-n recommended items
) RETURNS VOID AS $$
PythonFunction(recommendation_systems, latent_factor_model, predict)
$$ LANGUAGE plpythonu;

View File

@ -1,12 +0,0 @@
\i m4_regexp(MODULE_PATHNAME,
`\(.*\)libmadlib\.so',
`\1../../modules/recommendation_systems/test/movielens100k_dataset.setup.sql_in'
)
m4_include(`SQLCommon.m4')
-- train & test & predict --
select madlib.itemcf_train('public.test_movielens100k_train','UserId','ItemId','Rating',10,TRUE,'public.test_movielens100k_test',10);
select madlib.itemcf_predict('public.test_movielens100k_train','public.test_movielens100k_test','UserId','public.test_movielens100k_recommend',10);

View File

@ -1,12 +0,0 @@
\i m4_regexp(MODULE_PATHNAME,
`\(.*\)libmadlib\.so',
`\1../../modules/recommendation_systems/test/movielens100k_dataset.setup.sql_in'
)
m4_include(`SQLCommon.m4')
-- train & test & predict --
select madlib.LFM_train('public.test_movielens100k_train','UserId','ItemId','Rating',200,20,0.02,0.01,'public.test_movielens100k_test',10);
select madlib.LFM_predict('public.test_movielens100k_train','public.test_movielens100k_test','UserId','public.test_movielens100k_recommend',10);

View File

@ -1,25 +0,0 @@
\i m4_regexp(MODULE_PATHNAME,
`\(.*\)libmadlib\.so',
`\1../../modules/recommendation_systems/test/movielens100k_dataset.setup.sql_in'
)
m4_include(`SQLCommon.m4')
-- train & test & predict --
select madlib.usercf_train('public.test_movielens100k_train','UserId','ItemId','Rating',10,TRUE,'public.test_movielens100k_test',10);
select madlib.usercf_predict('public.test_movielens100k_train','public.test_movielens100k_test','UserId','public.test_movielens100k_recommend',10);
-- train(not use iif) & test & predict --
select madlib.usercf_train('public.test_movielens100k_train','UserId','ItemId','Rating',10,FALSE,'public.test_movielens100k_test',10);
select madlib.usercf_predict('public.test_movielens100k_train','public.test_movielens100k_test','UserId','public.test_movielens100k_recommend',10);
-- train & test(use default parameter n) & predict --
select madlib.usercf_train('public.test_movielens100k_train','UserId','ItemId','Rating',10,TRUE,'public.test_movielens100k_test');
select madlib.usercf_predict('public.test_movielens100k_train','public.test_movielens100k_test','UserId','public.test_movielens100k_recommend',10);
-- train & predict --
select madlib.usercf_train('public.test_movielens100k_train','UserId','ItemId','Rating',10,TRUE);
select madlib.usercf_predict('public.test_movielens100k_train','public.test_movielens100k_test','UserId','public.test_movielens100k_recommend',10);

View File

@ -1,262 +0,0 @@
# -*- coding:utf-8 -*-
# Copyright (c) 2020 Huawei Technologies Co.,Ltd.
#
# openGauss is licensed under Mulan PSL v2.
# You can use this software according to the terms
# and conditions of the Mulan PSL v2.
# You may obtain a copy of Mulan PSL v2 at:
#
# http://license.coscl.org.cn/MulanPSL2
#
# THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS,
# WITHOUT WARRANTIES OF ANY KIND,
# EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
# MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
# See the Mulan PSL v2 for more details.
# ----------------------------------------------------------------------------
from collections import defaultdict
from operator import itemgetter
import math
import random
import plpy
import json
from utilities.validate_args import quote_ident
from utilities.validate_args import table_exists
def train_validate(train_table, user_varname, item_varname,
rating_varname, k_sim_user, use_iif_similarity,
test_table, n_rec_item):
if not train_table or not user_varname or not item_varname or not rating_varname or not k_sim_user or k_sim_user < 1:
plpy.error("The input parameters are invalid.")
if not use_iif_similarity:
use_iif_similarity = False
if not test_table:
test_table = None
if not n_rec_item and n_rec_item != 0:
n_rec_item = 10
elif n_rec_item <= 0:
plpy.error("The input parameters are invalid.")
return use_iif_similarity, test_table, n_rec_item
def train(schema_madlib, train_table, user_varname, item_varname,
rating_varname, k_sim_user, use_iif_similarity,
test_table, n_rec_item, **kwargs):
# 0) Validate input parameter
use_iif_similarity, test_table, n_rec_item=train_validate(train_table,
user_varname, item_varname, rating_varname, k_sim_user,
use_iif_similarity, test_table, n_rec_item)
# 1) Generate user-item-rating matrix
sql = """select {user_varname}, {item_varname}, {rating_varname} from {train_table};""".format(
user_varname=quote_ident(user_varname),
item_varname=quote_ident(item_varname),
rating_varname=quote_ident(rating_varname),
train_table=train_table
)
results = plpy.execute(sql)
user_item_rating_mat = defaultdict(dict)
for result in results:
user_item_rating_mat[result[user_varname]][result[item_varname]] = result[rating_varname]
# 2) Generate user-user-similarity matrix
# 2.1) Set user_user_sim_table name
if use_iif_similarity:
user_user_sim_table = train_table+'_usercf_iif_sim'
else:
user_user_sim_table = train_table+'_usercf_sim'
# 2.2) First time generate user_user_sim_table
if not table_exists(user_user_sim_table):
# 2.2.1) Make item_user reverse set
item_user_set = defaultdict(set)
for user, items in user_item_rating_mat.items():
for item in items:
item_user_set[item].add(user)
# 2.2.2) Make user_user_sim_mat
user_user_sim_mat_val = {}
user_user_sim_mat_sum = {}
for item, users in item_user_set.items():
for useri in users:
user_user_sim_mat_val.setdefault(useri, defaultdict(float))
user_user_sim_mat_sum.setdefault(useri, defaultdict(float))
len_useri = len(user_item_rating_mat[useri])
for userj in users:
len_userj = len(user_item_rating_mat[userj])
if useri == userj:
continue
if use_iif_similarity:
user_user_sim_mat_val[useri][userj] += 1 / math.log(1 + len(users))
user_user_sim_mat_sum[useri][userj] = user_user_sim_mat_val[useri][userj] / math.sqrt(len_useri * len_userj)
else:
user_user_sim_mat_val[useri][userj] += 1
user_user_sim_mat_sum[useri][userj] = user_user_sim_mat_val[useri][userj] / math.sqrt(len_useri * len_userj)
# 2.2.3) store user_user_sim_mat
sql = """drop table if exists {user_user_sim_table};
create table {user_user_sim_table} (userid integer, sim_user json);""".format(
user_user_sim_table=user_user_sim_table
)
plpy.execute(sql)
for userid, users in user_user_sim_mat_sum.items():
users_json = json.dumps(users)
sql = """insert into {user_user_sim_table} values ({userid}, $${users_json}$$);""".format(
user_user_sim_table=user_user_sim_table,
userid=userid,
users_json=users_json
)
plpy.execute(sql)
# 2.3) Read user_user_sim_table
sql = """select * from {user_user_sim_table};""".format(
user_user_sim_table=user_user_sim_table,
)
results = plpy.execute(sql)
user_user_sim_mat = {}
for result in results:
useri = result['userid']
user_user_sim_mat.setdefault(useri, defaultdict(float))
users_json = json.loads(result['sim_user'])
for userj, useri_userj_sim in users_json.items():
user_user_sim_mat[useri][int(userj)] = useri_userj_sim
# 3) Generate user-item-score matrix
# 3.1) Make user_item_score table
user_item_score_mat = {}
for useri, users in user_user_sim_mat.items():
interacted_items = user_item_rating_mat[useri]
user_item_score_mat.setdefault(useri, defaultdict(float))
for userj, similarity in sorted(users.items(), key=itemgetter(1), reverse=True)[:k_sim_user]:
for item, rating in user_item_rating_mat[userj].items():
if item in interacted_items:
continue
user_item_score_mat[useri][item] += similarity * rating
# 3.2) Store user_item_score_mat
user_item_score_table = train_table + '_usercf_score'
sql = """drop table if exists {user_item_score_table};
create table {user_item_score_table} (userid integer, scored_item json);""".format(
user_item_score_table=user_item_score_table
)
plpy.execute(sql)
for userid, items in user_item_score_mat.items():
items_json = json.dumps(items)
sql = """insert into {user_item_score_table} values ({userid}, $${items_json}$$);""".format(
user_item_score_table=user_item_score_table,
userid=userid,
items_json=items_json
)
plpy.execute(sql)
plpy.info('Training finish! Users\' scored items are stored in table ' + user_item_score_table)
# 4) Test recommendation results
if test_table:
# 4.1) Do statistic
item_set = set()
item_popular = defaultdict(int)
for user, items in user_item_rating_mat.items():
for item in items:
item_set.add(item)
item_popular[item] += 1
item_count = len(item_set)
# 4.2) Read true data
sql = """select {user_varname}, {item_varname} from {test_table};""".format(
user_varname=quote_ident(user_varname),
item_varname=quote_ident(item_varname),
test_table=test_table
)
results = plpy.execute(sql)
user_item_true_mat = defaultdict(set)
for result in results:
user_item_true_mat[result[user_varname]].add(result[item_varname])
# 4.3) Test
hit_count = 0
rec_count = 0
test_count = 0
all_rec_items = set()
popular_sum = 0
for user in user_item_rating_mat:
if user not in user_item_true_mat:
continue
true_items = user_item_true_mat[user]
scored_items = user_item_score_mat[user]
for item, _ in sorted(scored_items.items(), key=itemgetter(1), reverse=True)[:n_rec_item]:
if item in true_items:
hit_count += 1
all_rec_items.add(item)
popular_sum += math.log(1 + item_popular[item])
rec_count += n_rec_item
test_count += len(true_items)
precision = hit_count / (1.0 * rec_count)
recall = hit_count / (1.0 * test_count)
coverage = len(all_rec_items) / (1.0 * item_count)
popularity = popular_sum / (1.0 * rec_count)
plpy.info('Testing finish! Precision:', precision, 'Recall:', recall, 'Coverage:', coverage, 'Popularity:', popularity)
def predict_validate(train_table, predict_table, user_varname, recommend_table, n_rec_item):
if not train_table or not predict_table or not user_varname or not recommend_table or not n_rec_item or n_rec_item < 1:
plpy.error("The input parameters are invalid.")
def predict(schema_madlib, train_table, predict_table, user_varname, recommend_table, n_rec_item, **kwargs):
# 0) Validate input parameters
predict_validate(train_table, predict_table, user_varname, recommend_table, n_rec_item)
# 1) Read user-item-score table
sql = """select userid, scored_item from {user_item_score_table};""".format(
user_item_score_table=train_table+'_usercf_score'
)
results = plpy.execute(sql)
user_item_score_mat = {}
for result in results:
userid = result['userid']
user_item_score_mat.setdefault(userid, [])
scored_items = json.loads(result['scored_item'])
for itemid, _ in sorted(scored_items.items(), key=itemgetter(1), reverse=True)[:n_rec_item]:
user_item_score_mat[userid].append(itemid)
# 2) Read predict table
sql = """select {user_varname} from {predict_table};""".format(
user_varname=quote_ident(user_varname),
predict_table=predict_table
)
results = plpy.execute(sql)
users = set()
for result in results:
users.add(result[user_varname])
# 3) Generate recommend table
sql = """drop table if exists {recommend_table};
create table {recommend_table} (userid integer, recommend_item integer[]);""".format(
recommend_table=recommend_table
)
plpy.execute(sql)
for user in users:
if user not in user_item_score_mat:
plpy.info("Userid", user, "is a cold-start user!")
continue
recommend_item = [int(x) for x in user_item_score_mat[user]]
sql = """insert into {recommend_table} values ({userid}, array{recommend_item});""".format(
recommend_table=recommend_table,
userid=user,
recommend_item=recommend_item
)
plpy.execute(sql)
plpy.info('Recommending finish! Users\' recommended items are stored in table ' + recommend_table)

View File

@ -1,93 +0,0 @@
/*
* Portions Copyright (c) 2020 Huawei Technologies Co.,Ltd.
*
* openGauss is licensed under Mulan PSL v2.
* You can use this software according to the terms and conditions of the Mulan PSL v2.
* You may obtain a copy of Mulan PSL v2 at:
*
* http://license.coscl.org.cn/MulanPSL2
*
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PSL v2 for more details.
* -------------------------------------------------------------------------
*/
-------------------------------------
-- Build user-based CF in database --
-------------------------------------
---------------------------------------------------------------------------------------------------
-- Note: This module allows you to use SQL to call user-based collaborative filtering algorithm. --
---------------------------------------------------------------------------------------------------
m4_include(`SQLCommon.m4')
-----------------
-- train & test--
-----------------
----------------------------------------------------------------------------------------------------------
-- train & test -- intermediate output table -------------------------------------------------------------
-- 1) train_table + '_usercf_sim': Table that stores all users and their similar users -------------------
-- 2) train_table + '_usercf_iif_sim': Table that stores all users and their similar users by using iif --
-- 3) train_table + '_usercf_score': Table that stores all users and their scored items ------------------
----------------------------------------------------------------------------------------------------------
CREATE OR REPLACE FUNCTION MADLIB_SCHEMA.usercf_train(
train_table TEXT, -- Name of the table containing the train data
user_varname TEXT, -- Name of the user column from the train table
item_varname TEXT, -- Name of the item column from the train table
rating_varname TEXT, -- Name of the rating column from the train table
k_sim_user INTEGER, -- Parameter for selecting the top-k similar users
use_iif_similarity BOOL, -- (optional, default:False) Determine whether to use the improved algorithm version
test_table TEXT, -- (optional, default:None) Name of the table containing the test data
n_rec_item INTEGER -- (optional, default:10) Parameter for selecting the top-n recommended items
) RETURNS VOID AS $$
PythonFunction(recommendation_systems, user_based_cf, train)
$$ LANGUAGE plpythonu;
CREATE OR REPLACE FUNCTION MADLIB_SCHEMA.usercf_train(
train_table TEXT,
user_varname TEXT,
item_varname TEXT,
rating_varname TEXT,
k_sim_user INTEGER,
use_iif_similarity BOOL,
test_table TEXT
) RETURNS VOID AS $$
SELECT MADLIB_SCHEMA.usercf_train($1, $2, $3, $4, $5, $6, $7, NULL);
$$ LANGUAGE sql;
CREATE OR REPLACE FUNCTION MADLIB_SCHEMA.usercf_train(
train_table TEXT,
user_varname TEXT,
item_varname TEXT,
rating_varname TEXT,
k_sim_user INTEGER,
use_iif_similarity BOOL
) RETURNS VOID AS $$
SELECT MADLIB_SCHEMA.usercf_train($1, $2, $3, $4, $5, $6, NULL, NULL);
$$ LANGUAGE sql;
CREATE OR REPLACE FUNCTION MADLIB_SCHEMA.usercf_train(
train_table TEXT,
user_varname TEXT,
item_varname TEXT,
rating_varname TEXT,
k_sim_user INTEGER
) RETURNS VOID AS $$
SELECT MADLIB_SCHEMA.usercf_train($1, $2, $3, $4, $5, FALSE, NULL, NULL);
$$ LANGUAGE sql;
-------------
-- predict --
-------------
CREATE OR REPLACE FUNCTION MADLIB_SCHEMA.usercf_predict(
train_table TEXT, -- Name of the table containing the train data
predict_table TEXT, -- Name of the table containing the predict data
user_varname TEXT, -- Name of user column from the train table
recommend_table TEXT, -- Name of the output table containing the recommended items
n_rec_item INTEGER -- Parameter for selecting the top-n recommended items
) RETURNS VOID AS $$
PythonFunction(recommendation_systems, user_based_cf, predict)
$$ LANGUAGE plpythonu;