public class ALS extends Estimator<ALSModel> implements ALSParams, DefaultParamsWritable
ALS attempts to estimate the ratings matrix R
as the product of two lower-rank matrices,
X
and Y
, i.e. X * Yt = R
. Typically these approximations are called 'factor' matrices.
The general approach is iterative. During each iteration, one of the factor matrices is held
constant, while the other is solved for using least squares. The newly-solved factor matrix is
then held constant while solving for the other factor matrix.
This is a blocked implementation of the ALS factorization algorithm that groups the two sets of factors (referred to as "users" and "products") into blocks and reduces communication by only sending one copy of each user vector to each product block on each iteration, and only for the product blocks that need that user's feature vector. This is achieved by pre-computing some information about the ratings matrix to determine the "out-links" of each user (which blocks of products it will contribute to) and "in-link" information for each product (which of the feature vectors it receives from each user block it will depend on). This allows us to send only an array of feature vectors between each user block and product block, and have the product block find the users' ratings and update the products based on these messages.
For implicit preference data, the algorithm used is based on "Collaborative Filtering for Implicit Feedback Datasets", available at https://doi.org/10.1109/ICDM.2008.22, adapted for the blocked approach used here.
Essentially instead of finding the low-rank approximations to the rating matrix R
,
this finds the approximations for a preference matrix P
where the elements of P
are 1 if
r is greater than 0 and 0 if r is less than or equal to 0. The ratings then act as 'confidence'
values related to strength of indicated user
preferences rather than explicit ratings given to items.
Note: the input rating dataset to the ALS implementation should be deterministic.
Nondeterministic data can cause failure during fitting ALS model.
For example, an order-sensitive operation like sampling after a repartition makes dataset
output nondeterministic, like dataset.repartition(2).sample(false, 0.5, 1618)
.
Checkpointing sampled dataset or adding a sort before sampling can help make the dataset
deterministic.
Modifier and Type | Class and Description |
---|---|
static class |
ALS.InBlock$ |
static interface |
ALS.LeastSquaresNESolver
Trait for least squares solvers applied to the normal equation.
|
static class |
ALS.Rating<ID>
Rating class for better code readability.
|
static class |
ALS.Rating$ |
static class |
ALS.RatingBlock$ |
Modifier and Type | Method and Description |
---|---|
DoubleParam |
alpha()
Param for the alpha parameter in the implicit preference formulation (nonnegative).
|
IntParam |
blockSize()
Param for block size for stacking input data in matrices.
|
IntParam |
checkpointInterval()
Param for set checkpoint interval (>= 1) or disable checkpoint (-1).
|
Param<String> |
coldStartStrategy()
Param for strategy for dealing with unknown or new users/items at prediction time.
|
ALS |
copy(ParamMap extra)
Creates a copy of this instance with the same UID and some extra params.
|
Param<String> |
finalStorageLevel()
Param for StorageLevel for ALS model factors.
|
ALSModel |
fit(Dataset<?> dataset)
Fits a model to the input data.
|
BooleanParam |
implicitPrefs()
Param to decide whether to use implicit preference.
|
Param<String> |
intermediateStorageLevel()
Param for StorageLevel for intermediate datasets.
|
Param<String> |
itemCol()
Param for the column name for item ids.
|
static ALS |
load(String path) |
IntParam |
maxIter()
Param for maximum number of iterations (>= 0).
|
BooleanParam |
nonnegative()
Param for whether to apply nonnegativity constraints.
|
IntParam |
numItemBlocks()
Param for number of item blocks (positive).
|
IntParam |
numUserBlocks()
Param for number of user blocks (positive).
|
static void |
org$apache$spark$internal$Logging$$log__$eq(org.slf4j.Logger x$1) |
static org.slf4j.Logger |
org$apache$spark$internal$Logging$$log_() |
Param<String> |
predictionCol()
Param for prediction column name.
|
IntParam |
rank()
Param for rank of the matrix factorization (positive).
|
Param<String> |
ratingCol()
Param for the column name for ratings.
|
static MLReader<T> |
read() |
DoubleParam |
regParam()
Param for regularization parameter (>= 0).
|
LongParam |
seed()
Param for random seed.
|
ALS |
setAlpha(double value) |
ALS |
setBlockSize(int value)
Set block size for stacking input data in matrices.
|
ALS |
setCheckpointInterval(int value) |
ALS |
setColdStartStrategy(String value) |
ALS |
setFinalStorageLevel(String value) |
ALS |
setImplicitPrefs(boolean value) |
ALS |
setIntermediateStorageLevel(String value) |
ALS |
setItemCol(String value) |
ALS |
setMaxIter(int value) |
ALS |
setNonnegative(boolean value) |
ALS |
setNumBlocks(int value)
Sets both numUserBlocks and numItemBlocks to the specific value.
|
ALS |
setNumItemBlocks(int value) |
ALS |
setNumUserBlocks(int value) |
ALS |
setPredictionCol(String value) |
ALS |
setRank(int value) |
ALS |
setRatingCol(String value) |
ALS |
setRegParam(double value) |
ALS |
setSeed(long value) |
ALS |
setUserCol(String value) |
static <ID> scala.Tuple2<RDD<scala.Tuple2<ID,float[]>>,RDD<scala.Tuple2<ID,float[]>>> |
train(RDD<ALS.Rating<ID>> ratings,
int rank,
int numUserBlocks,
int numItemBlocks,
int maxIter,
double regParam,
boolean implicitPrefs,
double alpha,
boolean nonnegative,
StorageLevel intermediateRDDStorageLevel,
StorageLevel finalRDDStorageLevel,
int checkpointInterval,
long seed,
scala.reflect.ClassTag<ID> evidence$1,
scala.math.Ordering<ID> ord)
Implementation of the ALS algorithm.
|
StructType |
transformSchema(StructType schema)
Check transform validity and derive the output schema from the input schema.
|
String |
uid()
An immutable unique ID for the object and its derivatives.
|
Param<String> |
userCol()
Param for the column name for user ids.
|
params
equals, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait
getAlpha, getFinalStorageLevel, getImplicitPrefs, getIntermediateStorageLevel, getNonnegative, getNumItemBlocks, getNumUserBlocks, getRank, getRatingCol, validateAndTransformSchema
getColdStartStrategy, getItemCol, getUserCol
getPredictionCol
getBlockSize
clear, copyValues, defaultCopy, defaultParamMap, explainParam, explainParams, extractParamMap, extractParamMap, get, getDefault, getOrDefault, getParam, hasDefault, hasParam, isDefined, isSet, onParamChange, paramMap, params, set, set, set, setDefault, setDefault, shouldOwn
toString
getMaxIter
getRegParam
getCheckpointInterval
write
save
$init$, initializeForcefully, initializeLogIfNecessary, initializeLogIfNecessary, initializeLogIfNecessary$default$2, initLock, isTraceEnabled, log, logDebug, logDebug, logError, logError, logInfo, logInfo, logName, logTrace, logTrace, logWarning, logWarning, org$apache$spark$internal$Logging$$log__$eq, org$apache$spark$internal$Logging$$log_, uninitialize
public static ALS load(String path)
public static <ID> scala.Tuple2<RDD<scala.Tuple2<ID,float[]>>,RDD<scala.Tuple2<ID,float[]>>> train(RDD<ALS.Rating<ID>> ratings, int rank, int numUserBlocks, int numItemBlocks, int maxIter, double regParam, boolean implicitPrefs, double alpha, boolean nonnegative, StorageLevel intermediateRDDStorageLevel, StorageLevel finalRDDStorageLevel, int checkpointInterval, long seed, scala.reflect.ClassTag<ID> evidence$1, scala.math.Ordering<ID> ord)
This implementation of the ALS factorization algorithm partitions the two sets of factors among
Spark workers so as to reduce network communication by only sending one copy of each factor
vector to each Spark worker on each iteration, and only if needed. This is achieved by
precomputing some information about the ratings matrix to determine which users require which
item factors and vice versa. See the Scaladoc for InBlock
for a detailed explanation of how
the precomputation is done.
In addition, since each iteration of calculating the factor matrices depends on the known
ratings, which are spread across Spark partitions, a naive implementation would incur
significant network communication overhead between Spark workers, as the ratings RDD would be
repeatedly shuffled during each iteration. This implementation reduces that overhead by
performing the shuffling operation up front, precomputing each partition's ratings dependencies
and duplicating those values to the appropriate workers before starting iterations to solve for
the factor matrices. See the Scaladoc for OutBlock
for a detailed explanation of how the
precomputation is done.
Note that the term "rating block" is a bit of a misnomer, as the ratings are not partitioned by contiguous blocks from the ratings matrix but by a hash function on the rating's location in the matrix. If it helps you to visualize the partitions, it is easier to think of the term "block" as referring to a subset of an RDD containing the ratings rather than a contiguous submatrix of the ratings matrix.
ratings
- (undocumented)rank
- (undocumented)numUserBlocks
- (undocumented)numItemBlocks
- (undocumented)maxIter
- (undocumented)regParam
- (undocumented)implicitPrefs
- (undocumented)alpha
- (undocumented)nonnegative
- (undocumented)intermediateRDDStorageLevel
- (undocumented)finalRDDStorageLevel
- (undocumented)checkpointInterval
- (undocumented)seed
- (undocumented)evidence$1
- (undocumented)ord
- (undocumented)public static MLReader<T> read()
public static org.slf4j.Logger org$apache$spark$internal$Logging$$log_()
public static void org$apache$spark$internal$Logging$$log__$eq(org.slf4j.Logger x$1)
public IntParam rank()
ALSParams
public IntParam numUserBlocks()
ALSParams
numUserBlocks
in interface ALSParams
public IntParam numItemBlocks()
ALSParams
numItemBlocks
in interface ALSParams
public BooleanParam implicitPrefs()
ALSParams
implicitPrefs
in interface ALSParams
public DoubleParam alpha()
ALSParams
public Param<String> ratingCol()
ALSParams
public BooleanParam nonnegative()
ALSParams
nonnegative
in interface ALSParams
public Param<String> intermediateStorageLevel()
ALSParams
StorageLevel
. Cannot be "NONE".
Default: "MEMORY_AND_DISK".
intermediateStorageLevel
in interface ALSParams
public Param<String> finalStorageLevel()
ALSParams
StorageLevel
.
Default: "MEMORY_AND_DISK".
finalStorageLevel
in interface ALSParams
public final LongParam seed()
HasSeed
public final IntParam checkpointInterval()
HasCheckpointInterval
checkpointInterval
in interface HasCheckpointInterval
public final DoubleParam regParam()
HasRegParam
regParam
in interface HasRegParam
public final IntParam maxIter()
HasMaxIter
maxIter
in interface HasMaxIter
public Param<String> userCol()
ALSModelParams
userCol
in interface ALSModelParams
public Param<String> itemCol()
ALSModelParams
itemCol
in interface ALSModelParams
public Param<String> coldStartStrategy()
ALSModelParams
coldStartStrategy
in interface ALSModelParams
public final IntParam blockSize()
HasBlockSize
blockSize
in interface HasBlockSize
public final Param<String> predictionCol()
HasPredictionCol
predictionCol
in interface HasPredictionCol
public String uid()
Identifiable
uid
in interface Identifiable
public ALS setRank(int value)
public ALS setNumUserBlocks(int value)
public ALS setNumItemBlocks(int value)
public ALS setImplicitPrefs(boolean value)
public ALS setAlpha(double value)
public ALS setUserCol(String value)
public ALS setItemCol(String value)
public ALS setRatingCol(String value)
public ALS setPredictionCol(String value)
public ALS setMaxIter(int value)
public ALS setRegParam(double value)
public ALS setNonnegative(boolean value)
public ALS setCheckpointInterval(int value)
public ALS setSeed(long value)
public ALS setIntermediateStorageLevel(String value)
public ALS setFinalStorageLevel(String value)
public ALS setColdStartStrategy(String value)
public ALS setBlockSize(int value)
value
- (undocumented)public ALS setNumBlocks(int value)
value
- (undocumented)public ALSModel fit(Dataset<?> dataset)
Estimator
public StructType transformSchema(StructType schema)
PipelineStage
We check validity for interactions between parameters during transformSchema
and
raise an exception if any parameter value is invalid. Parameter value checks which
do not depend on other parameters are handled by Param.validate()
.
Typical implementation should first conduct verification on schema change and parameter validity, including complex parameter interaction checks.
transformSchema
in class PipelineStage
schema
- (undocumented)