Skip to content

Commit

Permalink
Improve buffer reader speed. (#2188)
Browse files Browse the repository at this point in the history
* 1. Sleep 500 milliseconds after a batch re-call finish.
2. Re-create register lock index when the system property named debug is setting.
3. Get the register inventory before lock to avoid increment the sequence but not use.
4. Return the exchange flag after all the references and spans in one segment parsed to reduce the number of segment parse.
5. Put the not exchanged segment into a collection then try to exchange no more than 10 times because of the exchange is asynchronous.
6. Cache the segment object to avoid repeated deserialization.

#2185

* #2185
1. Sleep 500 milliseconds after a batch re-call finish.
2. Re-create register lock index when the system property named debug is setting.
3. Get the register inventory before lock to avoid increment the sequence but not use.
  • Loading branch information
peng-yongsheng authored and wu-sheng committed Jan 21, 2019
1 parent 9c08c3e commit 8b50343
Show file tree
Hide file tree
Showing 12 changed files with 283 additions and 108 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -70,25 +70,36 @@ private void onWork(RegisterSource registerSource) {

if (sources.size() > 1000 || registerSource.getEndOfBatchContext().isEndOfBatch()) {
sources.values().forEach(source -> {
int sequence;
if ((sequence = registerLockDAO.tryLockAndIncrement(scope)) != Const.NONE) {
try {
RegisterSource dbSource = registerDAO.get(modelName, source.id());
if (Objects.nonNull(dbSource)) {
if (dbSource.combine(source)) {
registerDAO.forceUpdate(modelName, dbSource);
try {
RegisterSource dbSource = registerDAO.get(modelName, source.id());
if (Objects.nonNull(dbSource)) {
if (dbSource.combine(source)) {
registerDAO.forceUpdate(modelName, dbSource);
}
} else {
int sequence;
if ((sequence = registerLockDAO.tryLockAndIncrement(scope)) != Const.NONE) {
try {
dbSource = registerDAO.get(modelName, source.id());
if (Objects.nonNull(dbSource)) {
if (dbSource.combine(source)) {
registerDAO.forceUpdate(modelName, dbSource);
}
} else {
source.setSequence(sequence);
registerDAO.forceInsert(modelName, source);
}
} catch (Throwable t) {
logger.error(t.getMessage(), t);
} finally {
registerLockDAO.releaseLock(scope);
}
} else {
source.setSequence(sequence);
registerDAO.forceInsert(modelName, source);
logger.info("{} inventory register try lock and increment sequence failure.", scope.name());
}
} catch (Throwable t) {
logger.error(t.getMessage(), t);
} finally {
registerLockDAO.releaseLock(scope);
}
} else {
logger.info("{} inventory register try lock and increment sequence failure.", scope.name());
} catch (Throwable t) {
logger.error(t.getMessage(), t);
}
});
sources.clear();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package org.apache.skywalking.oap.server.library.buffer;

import com.google.protobuf.GeneratedMessageV3;
import lombok.*;
import org.apache.skywalking.apm.network.language.agent.TraceSegmentObject;
import org.apache.skywalking.apm.network.language.agent.v2.SegmentObject;

/**
* @author peng-yongsheng
*/
@Getter
public class BufferData<MESSAGE_TYPE extends GeneratedMessageV3> {
private MESSAGE_TYPE messageType;
@Setter private TraceSegmentObject v1Segment;
@Setter private SegmentObject v2Segment;

public BufferData(MESSAGE_TYPE messageType) {
this.messageType = messageType;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package org.apache.skywalking.oap.server.library.buffer;

import com.google.protobuf.GeneratedMessageV3;
import java.util.*;
import java.util.concurrent.atomic.AtomicInteger;

/**
* @author peng-yongsheng
*/
public class BufferDataCollection<MESSAGE_TYPE extends GeneratedMessageV3> {

private AtomicInteger index = new AtomicInteger(0);
private final List<BufferData<MESSAGE_TYPE>> bufferDataList;

public BufferDataCollection(int size) {
this.bufferDataList = new ArrayList<>(size);
for (int i = 0; i < size; i++) {
bufferDataList.add(null);
}
}

public void add(BufferData<MESSAGE_TYPE> bufferData) {
bufferDataList.set(index.getAndIncrement(), bufferData);

}

public int size() {
return index.get();
}

public synchronized List<BufferData<MESSAGE_TYPE>> export() {
List<BufferData<MESSAGE_TYPE>> exportData = new ArrayList<>(index.get());
for (int i = 0; i < index.get(); i++) {
exportData.add(bufferDataList.get(i));
}
index.set(0);
return exportData;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,8 @@ public Builder<MESSAGE_TYPE> parser(Parser<MESSAGE_TYPE> parser) {
return this;
}

public Builder<MESSAGE_TYPE> callBack(DataStreamReader.CallBack<MESSAGE_TYPE> callBack) {
public Builder<MESSAGE_TYPE> callBack(
DataStreamReader.CallBack<MESSAGE_TYPE> callBack) {
this.callBack = callBack;
return this;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@

import com.google.protobuf.*;
import java.io.*;
import java.util.Objects;
import java.util.*;
import java.util.concurrent.*;
import org.apache.commons.io.FileUtils;
import org.apache.commons.io.filefilter.PrefixFileFilter;
Expand All @@ -38,6 +38,8 @@ public class DataStreamReader<MESSAGE_TYPE extends GeneratedMessageV3> {
private final Offset.ReadOffset readOffset;
private final Parser<MESSAGE_TYPE> parser;
private final CallBack<MESSAGE_TYPE> callBack;
private final int collectionSize = 100;
private final BufferDataCollection<MESSAGE_TYPE> bufferDataCollection;
private File readingFile;
private InputStream inputStream;

Expand All @@ -47,6 +49,7 @@ public class DataStreamReader<MESSAGE_TYPE extends GeneratedMessageV3> {
this.readOffset = readOffset;
this.parser = parser;
this.callBack = callBack;
this.bufferDataCollection = new BufferDataCollection<>(collectionSize);
}

void initialize() {
Expand Down Expand Up @@ -114,33 +117,64 @@ private void read() {
}

while (readOffset.getOffset() < readingFile.length()) {
BufferData<MESSAGE_TYPE> bufferData = new BufferData<>(parser.parseDelimitedFrom(inputStream));

MESSAGE_TYPE messageType = parser.parseDelimitedFrom(inputStream);
if (messageType != null) {
int i = 0;
while (!callBack.call(messageType)) {
try {
TimeUnit.MILLISECONDS.sleep(500);
} catch (InterruptedException e) {
logger.error(e.getMessage());
}
if (bufferData.getMessageType() != null) {
boolean isComplete = callBack.call(bufferData);
final int serialized = bufferData.getMessageType().getSerializedSize();
final int offset = CodedOutputStream.computeUInt32SizeNoTag(serialized) + serialized;
readOffset.setOffset(readOffset.getOffset() + offset);

i++;
if (i == 10) {
break;
if (!isComplete) {
if (bufferDataCollection.size() == collectionSize) {
reCall();
}
bufferDataCollection.add(bufferData);
}

if (logger.isDebugEnabled()) {
logger.debug("collection size: {}, max size: {}", bufferDataCollection.size(), collectionSize);
}
} else if (bufferDataCollection.size() > 0) {
reCall();
} else {
try {
TimeUnit.SECONDS.sleep(5);
} catch (InterruptedException e) {
logger.error(e.getMessage(), e);
}
final int serialized = messageType.getSerializedSize();
final int offset = CodedOutputStream.computeUInt32SizeNoTag(serialized) + serialized;
readOffset.setOffset(readOffset.getOffset() + offset);
}
}
} catch (IOException e) {
logger.error(e.getMessage(), e);
}
}

private void reCall() {
int maxCycle = 10;
for (int i = 1; i <= maxCycle; i++) {
if (bufferDataCollection.size() > 0) {
List<BufferData<MESSAGE_TYPE>> bufferDataList = bufferDataCollection.export();
for (BufferData<MESSAGE_TYPE> data : bufferDataList) {
if (!callBack.call(data)) {
if (i != maxCycle) {
bufferDataCollection.add(data);
}
}
}

try {
TimeUnit.MILLISECONDS.sleep(500);
} catch (InterruptedException e) {
logger.error(e.getMessage(), e);
}
} else {
break;
}
}
}

public interface CallBack<MESSAGE_TYPE extends GeneratedMessageV3> {
boolean call(MESSAGE_TYPE message);
boolean call(BufferData<MESSAGE_TYPE> bufferData);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,10 @@ public static void main(String[] args) throws IOException, InterruptedException
builder.dataFileMaxSize(50);
builder.offsetFileMaxSize(10);
builder.parser(TraceSegmentObject.parser());
builder.callBack(new SegmentParse());
builder.callBack(bufferData -> {
logger.info("segment parse: {}", bufferData.getMessageType().getSpans(0).getSpanId());
return false;
});

BufferStream<TraceSegmentObject> stream = builder.build();
stream.initialize();
Expand All @@ -62,14 +65,5 @@ public static void main(String[] args) throws IOException, InterruptedException
TimeUnit.MILLISECONDS.sleep(50);
}
}

}

private static class SegmentParse implements DataStreamReader.CallBack<TraceSegmentObject> {

@Override public boolean call(TraceSegmentObject message) {
logger.info("segment parse: {}", message.getSpans(0).getSpanId());
return true;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,7 @@
import org.apache.skywalking.apm.commons.datacarrier.DataCarrier;
import org.apache.skywalking.apm.commons.datacarrier.consumer.IConsumer;
import org.apache.skywalking.apm.network.servicemesh.ServiceMeshMetric;
import org.apache.skywalking.oap.server.library.buffer.BufferStream;
import org.apache.skywalking.oap.server.library.buffer.DataStreamReader;
import org.apache.skywalking.oap.server.library.buffer.*;
import org.apache.skywalking.oap.server.library.module.ModuleManager;
import org.apache.skywalking.oap.server.telemetry.TelemetryModule;
import org.apache.skywalking.oap.server.telemetry.api.*;
Expand Down Expand Up @@ -97,11 +96,11 @@ public void in(ServiceMeshMetric metric) {
/**
* File buffer callback. Block reading from buffer file, until metadata register done.
*
* @param message
* @param bufferData
* @return
*/
@Override public boolean call(ServiceMeshMetric message) {
ServiceMeshMetricDataDecorator decorator = new ServiceMeshMetricDataDecorator(message);
@Override public boolean call(BufferData<ServiceMeshMetric> bufferData) {
ServiceMeshMetricDataDecorator decorator = new ServiceMeshMetricDataDecorator(bufferData.getMessageType());
if (decorator.tryMetaDataRegister()) {
meshBufferFileOut.inc();
TelemetryDataDispatcher.doDispatch(decorator);
Expand Down
Loading

0 comments on commit 8b50343

Please sign in to comment.