
从JNI获取的Direct Buffer数据直接上传至S3,可以有效避免不必要的内存拷贝,提高程序性能。通常,从JNI获取的Direct Buffer需要先拷贝到JVM堆内存中,再进行上传操作。但这种方式会增加内存占用,并引入额外的拷贝开销。为了解决这个问题,我们可以利用jclouds库提供的ByteSource接口,自定义一个ByteSource实现,直接将Direct Buffer包装成输入流,从而避免了数据拷贝。
以下是具体实现步骤:
创建自定义的ByteBufferByteSource类
该类继承自ByteSource,并持有一个ByteBuffer对象。openStream()方法返回一个自定义的ByteBufferInputStream,用于从ByteBuffer中读取数据。
import com.google.common.io.ByteSource;
import java.io.IOException;
import java.io.InputStream;
import java.nio.BufferUnderflowException;
import java.nio.ByteBuffer;
import static com.google.common.base.Preconditions.checkNotNull;
public class ByteBufferByteSource extends ByteSource {
private final ByteBuffer buffer;
public ByteBufferByteSource(ByteBuffer buffer) {
this.buffer = checkNotNull(buffer);
}
@Override
public InputStream openStream() {
return new ByteBufferInputStream(buffer);
}
private static final class ByteBufferInputStream extends InputStream {
private final ByteBuffer buffer;
private boolean closed = false;
ByteBufferInputStream(ByteBuffer buffer) {
this.buffer = buffer;
}
@Override
public synchronized int read() throws IOException {
if (closed) {
throw new IOException("Stream already closed");
}
try {
return buffer.get() & 0xFF; // Important: convert byte to unsigned int
} catch (BufferUnderflowException bue) {
return -1;
}
}
@Override
public synchronized int read(byte[] b, int off, int len) throws IOException {
if (closed) {
throw new IOException("Stream already closed");
}
if (b == null) {
throw new NullPointerException();
} else if (off < 0 || len < 0 || len > b.length - off) {
throw new IndexOutOfBoundsException();
} else if (len == 0) {
return 0;
}
int available = buffer.remaining();
if (available == 0) {
return -1;
}
int readLength = Math.min(len, available);
buffer.get(b, off, readLength);
return readLength;
}
@Override
public void close() throws IOException {
super.close();
closed = true;
}
}
}注意事项:
使用自定义的ByteBufferByteSource上传数据
在上传数据时,使用ByteBufferByteSource包装Direct Buffer,并将其作为payload传递给blobBuilder。
import org.jclouds.ContextBuilder;
import org.jclouds.blobstore.BlobStore;
import org.jclouds.blobstore.BlobStoreContext;
import org.jclouds.blobstore.domain.Blob;
import java.nio.ByteBuffer;
public String uploadDirectBuffer(String container, String objectKey, ByteBuffer bb) {
BlobStoreContext context = ContextBuilder.newBuilder("aws-s3") // 替换为你的S3 provider
.credentials("YOUR_ACCESS_KEY", "YOUR_SECRET_KEY") // 替换为你的凭证
.buildView(BlobStoreContext.class);
BlobStore blobStore = context.getBlobStore();
ByteBufferByteSource byteSource = new ByteBufferByteSource(bb);
Blob blob = blobStore.blobBuilder(objectKey)
.payload(byteSource)
.contentLength(bb.capacity())
.build();
blobStore.putBlob(container, blob);
context.close();
return objectKey;
}注意事项:
JNI代码示例
#include <jni.h>
#include <iostream>
#include <sys/mman.h>
#include <fcntl.h>
#include <unistd.h>
#define SHM_NAME "/my_shared_memory"
#define SHM_SIZE 200 * 1024 * 1024 // 200MB
JNIEXPORT jobject JNICALL Java_service_SharedMemoryJNIService_getDirectByteBuffer(JNIEnv *env, jclass clazz, jlong buf_addr, jint buf_len) {
return env->NewDirectByteBuffer((void *)buf_addr, buf_len);
}
JNIEXPORT jlong JNICALL Java_service_SharedMemoryJNIService_createSharedMemory(JNIEnv *env, jclass clazz) {
int shm_fd = shm_open(SHM_NAME, O_CREAT | O_RDWR, 0666);
if (shm_fd == -1) {
perror("shm_open");
return -1;
}
if (ftruncate(shm_fd, SHM_SIZE) == -1) {
perror("ftruncate");
close(shm_fd);
return -1;
}
void *ptr = mmap(0, SHM_SIZE, PROT_READ | PROT_WRITE, MAP_SHARED, shm_fd, 0);
if (ptr == MAP_FAILED) {
perror("mmap");
close(shm_fd);
return -1;
}
close(shm_fd); // No longer needed after mmap
return (jlong)ptr;
}
JNIEXPORT void JNICALL Java_service_SharedMemoryJNIService_writeToSharedMemory(JNIEnv *env, jclass clazz, jlong buf_addr, jint data_size) {
char *ptr = (char *)buf_addr;
for (int i = 0; i < data_size; ++i) {
ptr[i] = 'A' + (i % 26); // Example data
}
}
JNIEXPORT void JNICALL Java_service_SharedMemoryJNIService_unmapSharedMemory(JNIEnv *env, jclass clazz, jlong buf_addr) {
void *ptr = (void *)buf_addr;
if (munmap(ptr, SHM_SIZE) == -1) {
perror("munmap");
}
}
JNIEXPORT void JNICALL Java_service_SharedMemoryJNIService_deleteSharedMemory(JNIEnv *env, jclass clazz) {
if (shm_unlink(SHM_NAME) == -1) {
perror("shm_unlink");
}
}Java代码示例
package service;
import java.nio.ByteBuffer;
public class SharedMemoryJNIService {
static {
System.loadLibrary("sharedmemory"); // Load the sharedmemory library
}
public native ByteBuffer getDirectByteBuffer(long buf_addr, int buf_len);
public native long createSharedMemory();
public native void writeToSharedMemory(long buf_addr, int data_size);
public native void unmapSharedMemory(long buf_addr);
public native void deleteSharedMemory();
public static void main(String[] args) {
SharedMemoryJNIService service = new SharedMemoryJNIService();
long sharedMemoryAddress = service.createSharedMemory();
if (sharedMemoryAddress == -1) {
System.err.println("Failed to create shared memory.");
return;
}
int dataSize = 50 * 1024 * 1024; // 50MB
service.writeToSharedMemory(sharedMemoryAddress, dataSize);
ByteBuffer directBuffer = service.getDirectByteBuffer(sharedMemoryAddress, dataSize);
// Example usage with the S3 uploader
S3Uploader s3Uploader = new S3Uploader(); // Replace with your S3 uploader class
String objectKey = "my-object";
String bucketName = "my-bucket";
try {
String uploadedKey = s3Uploader.uploadDirectBuffer(bucketName, objectKey, directBuffer);
System.out.println("Uploaded to S3: " + uploadedKey);
} catch (Exception e) {
System.err.println("Failed to upload to S3: " + e.getMessage());
e.printStackTrace();
} finally {
service.unmapSharedMemory(sharedMemoryAddress);
service.deleteSharedMemory();
}
}
}注意事项:
总结
通过自定义ByteSource,我们可以避免将Direct Buffer数据拷贝到JVM堆内存,从而实现高效的数据上传。这种方法不仅减少了内存占用,还降低了拷贝开销,提升了程序性能。在实际应用中,可以根据具体需求对ByteBufferInputStream进行优化,例如实现更高效的读取方式。
以上就是无需拷贝:直接将JNI Direct Buffer数据上传至S3的优化方案的详细内容,更多请关注php中文网其它相关文章!
每个人都需要一台速度更快、更稳定的 PC。随着时间的推移,垃圾文件、旧注册表数据和不必要的后台进程会占用资源并降低性能。幸运的是,许多工具可以让 Windows 保持平稳运行。
Copyright 2014-2025 https://www.php.cn/ All Rights Reserved | php.cn | 湘ICP备2023035733号