Originariamente inviato da fuoricorso
avevo pensato che una volta che il client(cioè il thread del server) faceva la get(...) sulla mappa, usava il semaforo della chiave per sincronizzare una modifica concorrente, quindi dovrei gestire la sincronizzazione nella modifica di una stessa chiave
Volendo si può usare il Lock sull'oggetto associato ad una determinata chiave.
Nell'esempio ho introdotto tale classe (rimpiazza l'oggetto String):
Message.java
codice:
/**
* @(#)Message.java
*
*
* @author Vincenzo
* @version 1.00 2011/6/7
*/
import java.util.concurrent.locks.*;
public class Message {
private String message;
private final Lock lock;
/**
* Creates a new instance of <code>Message</code>.
*/
public Message(String message) {
this.lock = new ReentrantLock();
this.message = message;
}
/*
* @param message
* @throws RuntimeException
*/
public void setMessage(String message) {
if( this.lock.tryLock() ) {
try {
// ad esempio il lavoro dura 10 sec
try{
Thread.sleep(10000);
this.message = message;
} catch (InterruptedException e){
e.printStackTrace();
}
} finally {
this.lock.unlock();
}
} else
throw
new RuntimeException("setMessage: Message already locked by another thread!");
}
/*
* @return message if not locked
* @throws RuntimeException
*/
public String getMessage() {
if( ((ReentrantLock)this.lock).isLocked() )
throw new RuntimeException("getMessage: Message already locked by another thread!");
return this.message;
}
@Override
public String toString(){
return this.message;
}
}
che, come puoi vedere, utilizza un oggetto ReentrantLock.
Quindi Server e Client li ho modificati come segue:
Server.java
codice:
/**
* @(#)Server.java
*
*
* @author Vincenzo
* @version 1.00 2011/6/7
*/
import java.io.*;
import java.net.*;
import java.util.concurrent.*;
public class Server {
private ServerSocket ss;
private ConcurrentLinkedQueue<Client> process;
private ConcurrentHashMap<Integer, Message> map;
/**
* Creates a new instance of <code>Server</code>.
*/
public Server(int port) {
try {
this.ss = new ServerSocket(port);
this.map = new ConcurrentHashMap<Integer, Message>();
this.process = new ConcurrentLinkedQueue<Client>();
} catch ( IOException e ){
e.printStackTrace();
}
}
public void start() throws IOException{
System.out.println("Server started..\n");
while( true ){
Socket s = null;
if( (s = this.ss.accept()) != null )
new Client(s).start();
}
}
/**
* Client
* @author Vincenzo
*/
private class Client extends Thread{
private ObjectOutputStream oos;
private ObjectInputStream ois;
public Client(Socket s) throws IOException {
this.oos = new ObjectOutputStream( s.getOutputStream() );
this.ois = new ObjectInputStream( s.getInputStream() );
process.add(this);
System.out.println("New connection: " + s);
}
@Override
public void run() {
try {
try {
String protocol = null;
while( (protocol = (String)this.ois.readObject()) != null ){
if( protocol.equals("SET") ){
int key = (Integer)this.ois.readObject();
String message = (String)this.ois.readObject();
try {
Message m = null;
if( (m = map.get(key)) == null )
map.put(key, new Message(message));
else
m.setMessage(message);
// notifica all'utente
this.oos.writeObject( (m != null ? "OK you have modified message with key "
: "OK you have entered new message with key ") + key);
// output map
System.out.println("\nUltima modifica by: " + this);
System.out.println("Map:");
for(Integer k : map.keySet())
System.out.println("- " + k + " : " + map.get(k));
System.out.println();
} catch ( RuntimeException e ){
this.oos.writeObject( e.getMessage() );
System.out.println( "Access not allowed to " + this + "\n" + e.getMessage() + ", key: " + key );
}
} else if ( protocol.equals("GET") ){
try {
Message message = map.get((Integer)this.ois.readObject());
if( message != null )
this.oos.writeObject( message.getMessage() );
else
this.oos.writeObject("ERROR inexistent key.");
// situazione dei processi
System.out.println("\nSituazione processi:");
for(Client p : process)
System.out.println("* " + p + ", alive = " + p.isAlive());
System.out.println();
} catch ( RuntimeException e ){
this.oos.writeObject( "ERROR " + e.getMessage() );
System.out.println( "Access not allowed to " + this + "\n" + e.getMessage() );
}
}
}
} finally {
this.oos.close();
this.ois.close();
}
} catch ( Exception e ){
System.out.println( e.getMessage() + ":" + this );
}
}
}
/**
* @param args the command line arguments
*/
public static void main(String[] args) {
// TODO code application logic here
try {
new Server(8000).start();
} catch ( IOException e ){
e.printStackTrace();
}
}
}
Client.java
codice:
/**
* @(#)Client.java
*
*
* @author Vincenzo
* @version 1.00 2011/6/7
*/
import java.io.*;
import java.net.*;
import java.util.Scanner;
public class Client {
/**
* @param args the command line arguments
*/
public static void main(String[] args) {
// TODO code application logic here
try {
Socket s = new Socket("localhost", 8000);
ObjectOutputStream oos = new ObjectOutputStream( s.getOutputStream() );
ObjectInputStream ois = new ObjectInputStream( s.getInputStream() );
System.out.println("Connesso con il server!\n");
Scanner scan = new Scanner(System.in);
while( true ){
System.out.println("1 - Inserisci/modifica valore di una chiave");
System.out.println("2 - Ottieni il valore di una chiave");
System.out.print("Scelta: " );
int choice = Integer.parseInt( scan.nextLine() );
int key;
String message;
switch ( choice ){
case 1:
System.out.print("\nKey: ");
key = Integer.parseInt( scan.nextLine() );
System.out.print("Message: ");
message = scan.nextLine();
oos.writeObject("SET");
oos.writeObject(key);
oos.writeObject(message);
message = (String)ois.readObject();
if( !message.contains("OK") )
System.out.println("Server after dispatch says: " + message);
else
System.out.println("YEAH! Message for key " + key + " accepted!");
break;
case 2:
System.out.print("\nKey to read: ");
key = Integer.parseInt( scan.nextLine() );
oos.writeObject("GET");
oos.writeObject(key);
message = (String)ois.readObject();
if ( message.contains("ERROR") )
System.out.println("Server after request says: " + message);
else
System.out.println("YEAH! I took this message for key " + key + ": " + message);
break;
}
System.out.println();
}
} catch ( Exception e ){
e.printStackTrace();
}
}
}
In pratica se un thread sta settando un messaggio con chiave k e nello stesso istante t (nell'esempio varia in [0, 10] sec) un secondo thread tenta di accedere o settare il contenuto della medesima chiave k, non può farlo finchè il thread che lo precede non finisce.