-
Notifications
You must be signed in to change notification settings - Fork 849
Description
SynapseML version
1.0.4
System information
- Language version (e.g. python 3.8, scala 2.12):
- Spark Version (e.g. 3.2.3):
- Spark Platform (e.g. Synapse, Databricks):
Describe the problem
I reviewed the port bind code. It works well on a physical machine. However, if my YARN is a virtual container based on a physical machine, the port bind sometimes fails because it finds the port then closed and rebinds after a time.
when n task run on same pyhsical machine and bind port not on same time, this impl may cause bind failed.
Code to reproduce issue
**
Other info / logs
def getGlobalNetworkInfo(ctx: TrainingContext,
log: Logger,
taskId: Long,
partitionId: Int,
shouldExecuteTraining: Boolean,
measures: TaskInstrumentationMeasures): NetworkTopologyInfo = {
measures.markNetworkInitializationStart()
val networkParams = ctx.networkParams
val out = using(findOpenPort(ctx, log).get) {
openPort =>
val localListenPort = openPort.getLocalPort
log.info(s"LightGBM task
FaultToleranceUtils.retryWithTimeout() {
getNetworkTopologyInfoFromDriver(networkParams,
taskId,
partitionId,
localListenPort,
log,
shouldExecuteTraining)
}
}.get
measures.markNetworkInitializationStop()
out
}
def mapPartitionTask(ctx: TrainingContext)(inputRows: Iterator[Row]): Iterator[PartitionResult] = {
// Start with initialization
val taskCtx = initialize(ctx, inputRows)
if (taskCtx.isEmptyPartition) {
log.warn("LightGBM task encountered empty partition, for best performance ensure no partitions are empty")
Array { PartitionResult(None, taskCtx.measures) }.toIterator
} else {
// Perform any data preparation work
val dataIntermediateState = preparePartitionData(taskCtx, inputRows)
try {
if (taskCtx.shouldExecuteTraining) {
// If participating in training, initialize the network ring of communication
NetworkManager.initLightGBMNetwork(taskCtx, log)
if (ctx.useSingleDatasetMode) {
log.info(s"Waiting for all data prep to be done, task ${taskCtx.taskId}, partition ${taskCtx.partitionId}")
ctx.sharedState().dataPreparationDoneSignal.await()
}
// Create the final Dataset for training and execute training iterations
finalizeDatasetAndTrain(taskCtx, dataIntermediateState)
} else {
log.info(s"Helper task ${taskCtx.taskId}, partition ${taskCtx.partitionId} finished processing rows")
ctx.sharedState().dataPreparationDoneSignal.countDown()
Array { PartitionResult(None, taskCtx.measures) }.toIterator
}
} finally {
cleanup(taskCtx)
}
}
}
What component(s) does this bug affect?
area/cognitive
: Cognitive projectarea/core
: Core projectarea/deep-learning
: DeepLearning projectarea/lightgbm
: Lightgbm projectarea/opencv
: Opencv projectarea/vw
: VW projectarea/website
: Websitearea/build
: Project build systemarea/notebooks
: Samples under notebooks folderarea/docker
: Docker usagearea/models
: models related issueTo pick up a draggable item, press the space bar. While dragging, use the arrow keys to move the item. Press space again to drop the item in its new position, or press escape to cancel.
What language(s) does this bug affect?
language/scala
: Scala source codelanguage/python
: Pyspark APIslanguage/r
: R APIslanguage/csharp
: .NET APIslanguage/new
: Proposals for new client languagesTo pick up a draggable item, press the space bar. While dragging, use the arrow keys to move the item. Press space again to drop the item in its new position, or press escape to cancel.
What integration(s) does this bug affect?
integrations/synapse
: Azure Synapse integrationsintegrations/azureml
: Azure ML integrationsintegrations/databricks
: Databricks integrationsTo pick up a draggable item, press the space bar. While dragging, use the arrow keys to move the item. Press space again to drop the item in its new position, or press escape to cancel.
Activity