Conversation
Co-authored-by: Ehco <zh19960202@gmail.com>
Co-authored-by: Ehco <zh19960202@gmail.com>
| chanSize := s.cfg.QueueSize * s.cfg.WorkerCount / 2 | ||
| if s.cfg.Compact { | ||
| chanSize /= 2 | ||
| } | ||
| s.dmlJobCh = make(chan *job, chanSize) |
There was a problem hiding this comment.
DM originally cached s.cfg.QueueSize * s.cfg.WorkerCount dml jobs in memory. Now if compact: false, dmlJobCh will dmlWorker will both cached s.cfg.QueueSize * s.cfg.WorkerCount/2 jobs. If compact: true, dmlJobCh, compactor buffer, compactor output channel and dmlWorker will all cached s.cfg.QueueSize * s.cfg.WorkerCount/4 jobs.
Actually we can use a larger compact buffer-size, but if so, when user pause-task/stop-task, they may need to wait a longer time to wait all jobs flushed.
There was a problem hiding this comment.
seems causality output channel didn't get adjusted
Ehco1996
left a comment
There was a problem hiding this comment.
LGTM
if all chanSize and bufferSize is accept by other reviews, btw pelease add some comments when the parameters are determined
|
upgrade failed due to https://github.com/pingcap/dm/issues/1961 |
lance6716
left a comment
There was a problem hiding this comment.
will review after food
| *delDML = *updateDML | ||
| delDML.op = del | ||
| // use oldValues of update as values of delete and reset oldValues | ||
| delDML.values = updateDML.oldValues |
There was a problem hiding this comment.
maybe in future we can remove the op of DML,
swicth DML:
only has new value: it's an INSERT
only has old value: it's a DELETE
else: it's a UPDATE
There was a problem hiding this comment.
Maybe some problems like a table with all columns are generate columns, so new value and old value will both be null in insert statement.
There was a problem hiding this comment.
maybe there should be at least one column, to let other columns generated from it.
There was a problem hiding this comment.
I think this kind of implicit opType is no good for the readbility if our code? Are there any harm to reserve the explicit op field?
| metrics.QueueSizeGauge.WithLabelValues(c.task, "compactor_input", c.source).Set(float64(len(c.inCh))) | ||
|
|
||
| if j.tp == flush { | ||
| c.flushBuffer() |
There was a problem hiding this comment.
this will block the consumption of c.inCh. if we has a lot of jobs to be flushed, we are now blocked to wait every job sent to next stage. This will increase the risk of upstream read timeout.
Maybe we can implement ping-pong buffer in future.
There was a problem hiding this comment.
I think this is not the same problem. Upstream read timeout may also caused by the dmlJobCh is full. And after we support async flush checkpoint, we only need flush job when stop-task/pause-task?
There was a problem hiding this comment.
when DM write 1 job per second to downstream and we have 100 jobs in compactor to flush, we now have to wait 100 seconds before reading upstream. Without compactor we only wait 1 second.
There was a problem hiding this comment.
Co-authored-by: lance6716 <lance6716@gmail.com>
lance6716
left a comment
There was a problem hiding this comment.
will review tests later
| chanSize := s.cfg.QueueSize * s.cfg.WorkerCount / 2 | ||
| if s.cfg.Compact { | ||
| chanSize /= 2 | ||
| } | ||
| s.dmlJobCh = make(chan *job, chanSize) |
There was a problem hiding this comment.
seems causality output channel didn't get adjusted
| } | ||
|
|
||
| for i := 0; i < len(values); i++ { | ||
| if values[i] != oldValues[i] { |
There was a problem hiding this comment.
we're comparing interface{}, so I guess they are not equal for most time?
There was a problem hiding this comment.
Interface values are comparable. Two interface values are equal if they have identical dynamic types and equal dynamic values or if both have value nil.
https://stackoverflow.com/questions/34245932/checking-equality-of-interface
I think causality output channel no need to adjust, because it has no buffer and default queue size 1024 is enough. |
| if c.safeMode { | ||
| j.dml.safeMode = true | ||
| } |
There was a problem hiding this comment.
| if c.safeMode { | |
| j.dml.safeMode = true | |
| } | |
| j.dml.safeMode = c.safeMode |
| key := j.dml.identifyKey() | ||
| prevPos, ok := tableKeyMap[key] | ||
| // if no such key in the buffer, add it | ||
| if !ok || prevPos >= len(c.buffer) { |
There was a problem hiding this comment.
How can prevPos bigger than buffer length
There was a problem hiding this comment.
just avoid panic. have removed it
| *delDML = *updateDML | ||
| delDML.op = del | ||
| // use oldValues of update as values of delete and reset oldValues | ||
| delDML.values = updateDML.oldValues |
There was a problem hiding this comment.
I think this kind of implicit opType is no good for the readbility if our code? Are there any harm to reserve the explicit op field?
|
|
||
| // dmlWorkerWrap creates and runs a dmlWorker instance and returns flush job channel. | ||
| func dmlWorkerWrap(inCh chan *job, syncer *Syncer) chan *job { | ||
| chanSize := syncer.cfg.QueueSize / 2 |
There was a problem hiding this comment.
Please add a comment for this special logic? I'm curious if we can change the default value of this field to avoid this change. BTW, doesn't anyone know the effect of the chan size change? How can a user know he/she need to adjust its value and what is the proper value?
There was a problem hiding this comment.
I think no user will change the queue-size... so I keep the original value and didn't add a new config item like compact-buffer-size
|
Please comment in pingcap/ticdc#3162 |
|
/run-unit-test |
What problem does this PR solve?
add compactor to compact dml
What is changed and how it works?
Check List
Tests